This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 55fe75d083 Purge existing SLA implementation (#42285)
55fe75d083 is described below
commit 55fe75d083499c6fbaaca35d6dbad4487f4a9ad0
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Sep 25 10:14:25 2024 -0700
Purge existing SLA implementation (#42285)
SLA will be reimplemented in either 3.0 or 3.1
---
airflow/api_internal/endpoints/rpc_api_endpoint.py | 1 -
airflow/callbacks/callback_requests.py | 20 --
airflow/config_templates/config.yml | 7 -
airflow/dag_processing/manager.py | 47 +--
airflow/dag_processing/processor.py | 196 +---------
airflow/example_dags/example_sla_dag.py | 66 ----
airflow/jobs/scheduler_job_runner.py | 28 +-
airflow/models/baseoperator.py | 18 +-
airflow/models/dag.py | 18 +-
airflow/models/mappedoperator.py | 15 +-
airflow/serialization/enums.py | 1 -
airflow/serialization/serialized_objects.py | 8 +-
airflow/settings.py | 3 -
.../notifications/chime_notifier_howto_guide.rst | 4 -
.../notifications/sns.rst | 5 -
.../notifications/sqs.rst | 5 -
.../pagerduty_notifier_howto_guide.rst | 4 -
.../notifications/slack_notifier_howto_guide.rst | 4 -
.../slackwebhook_notifier_howto_guide.rst | 4 -
.../notifications/smtp_notifier_howto_guide.rst | 4 -
.../logging-monitoring/callbacks.rst | 1 -
.../logging-monitoring/metrics.rst | 4 -
docs/apache-airflow/core-concepts/tasks.rst | 73 +---
docs/conf.py | 1 -
newsfragments/42285.significant.rst | 1 +
tests/callbacks/test_callback_requests.py | 9 -
tests/dag_processing/test_job_runner.py | 34 +-
tests/dag_processing/test_processor.py | 394 +--------------------
tests/jobs/test_scheduler_job.py | 78 +---
tests/models/test_baseoperator.py | 45 ---
tests/serialization/test_dag_serialization.py | 5 -
31 files changed, 39 insertions(+), 1064 deletions(-)
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index e4a5069b29..8716d9c9cc 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -101,7 +101,6 @@ def initialize_method_map() -> dict[str, Callable]:
DagFileProcessor._execute_task_callbacks,
DagFileProcessor.execute_callbacks,
DagFileProcessor.execute_callbacks_without_dag,
- DagFileProcessor.manage_slas,
DagFileProcessor.save_dag_to_db,
DagFileProcessor.update_import_errors,
DagFileProcessor._validate_task_pools_and_update_dag_warnings,
diff --git a/airflow/callbacks/callback_requests.py
b/airflow/callbacks/callback_requests.py
index 7158c45d44..07ad648e96 100644
--- a/airflow/callbacks/callback_requests.py
+++ b/airflow/callbacks/callback_requests.py
@@ -137,23 +137,3 @@ class DagCallbackRequest(CallbackRequest):
self.dag_id = dag_id
self.run_id = run_id
self.is_failure_callback = is_failure_callback
-
-
-class SlaCallbackRequest(CallbackRequest):
- """
- A class with information about the SLA callback to be executed.
-
- :param full_filepath: File Path to use to run the callback
- :param dag_id: DAG ID
- :param processor_subdir: Directory used by Dag Processor when parsed the
dag.
- """
-
- def __init__(
- self,
- full_filepath: str,
- dag_id: str,
- processor_subdir: str | None,
- msg: str | None = None,
- ):
- super().__init__(full_filepath, processor_subdir=processor_subdir,
msg=msg)
- self.dag_id = dag_id
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 3bef18058d..c9abee3c85 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -395,13 +395,6 @@ core:
type: integer
example: ~
default: "30"
- check_slas:
- description: |
- On each dagrun check against defined SLAs
- version_added: 1.10.8
- type: string
- example: ~
- default: "True"
xcom_backend:
description: |
Path to custom XCom class that will be used to store and resolve
operators results
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 6df8060f3a..05fb72daee 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -42,7 +42,7 @@ from tabulate import tabulate
import airflow.models
from airflow.api_internal.internal_api_call import internal_api_call
-from airflow.callbacks.callback_requests import CallbackRequest,
SlaCallbackRequest
+from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models.dag import DagModel
@@ -752,40 +752,17 @@ class DagFileProcessorManager(LoggingMixin):
return callback_queue
def _add_callback_to_queue(self, request: CallbackRequest):
- # requests are sent by dag processors. SLAs exist per-dag, but can be
generated once per SLA-enabled
- # task in the dag. If treated like other callbacks, SLAs can cause
feedback where a SLA arrives,
- # goes to the front of the queue, gets processed, triggers more SLAs
from the same DAG, which go to
- # the front of the queue, and we never get round to picking stuff off
the back of the queue
- if isinstance(request, SlaCallbackRequest):
- if request in self._callback_to_execute[request.full_filepath]:
- self.log.debug("Skipping already queued SlaCallbackRequest")
- return
-
- # not already queued, queue the callback
- # do NOT add the file of this SLA to self._file_path_queue. SLAs
can arrive so rapidly that
- # they keep adding to the file queue and never letting it drain.
This in turn prevents us from
- # ever rescanning the dags folder for changes to existing dags. We
simply store the callback, and
- # periodically, when self._file_path_queue is drained, we rescan
and re-queue all DAG files.
- # The SLAs will be picked up then. It means a delay in reacting to
the SLAs (as controlled by the
- # min_file_process_interval config) but stops SLAs from DoS'ing
the queue.
- self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
- self._callback_to_execute[request.full_filepath].append(request)
- Stats.incr("dag_processing.sla_callback_count")
-
- # Other callbacks have a higher priority over DAG Run scheduling, so
those callbacks gazump, even if
- # already in the file path queue
- else:
- self.log.debug("Queuing %s CallbackRequest: %s",
type(request).__name__, request)
- self._callback_to_execute[request.full_filepath].append(request)
- if request.full_filepath in self._file_path_queue:
- # Remove file paths matching request.full_filepath from
self._file_path_queue
- # Since we are already going to use that filepath to run
callback,
- # there is no need to have same file path again in the queue
- self._file_path_queue = deque(
- file_path for file_path in self._file_path_queue if
file_path != request.full_filepath
- )
- self._add_paths_to_queue([request.full_filepath], True)
- Stats.incr("dag_processing.other_callback_count")
+ self.log.debug("Queuing %s CallbackRequest: %s",
type(request).__name__, request)
+ self._callback_to_execute[request.full_filepath].append(request)
+ if request.full_filepath in self._file_path_queue:
+ # Remove file paths matching request.full_filepath from
self._file_path_queue
+ # Since we are already going to use that filepath to run callback,
+ # there is no need to have same file path again in the queue
+ self._file_path_queue = deque(
+ file_path for file_path in self._file_path_queue if file_path
!= request.full_filepath
+ )
+ self._add_paths_to_queue([request.full_filepath], True)
+ Stats.incr("dag_processing.other_callback_count")
def _refresh_requested_filelocs(self) -> None:
"""Refresh filepaths from dag dir as requested by users via APIs."""
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 0b19d8f2db..f030cb7501 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -25,33 +25,28 @@ import time
import zipfile
from contextlib import contextmanager, redirect_stderr, redirect_stdout,
suppress
from dataclasses import dataclass
-from datetime import timedelta
-from typing import TYPE_CHECKING, Generator, Iterable, Iterator
+from typing import TYPE_CHECKING, Generator, Iterable
from setproctitle import setproctitle
-from sqlalchemy import delete, event, func, or_, select
+from sqlalchemy import delete, event
from airflow import settings
-from airflow.api_internal.internal_api_call import InternalApiConfig,
internal_api_call
+from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
DagCallbackRequest,
- SlaCallbackRequest,
TaskCallbackRequest,
)
from airflow.configuration import conf
-from airflow.exceptions import AirflowException, TaskNotFound
+from airflow.exceptions import AirflowException
from airflow.listeners.listener import get_listener_manager
-from airflow.models import SlaMiss
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
-from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstance, TaskInstance as TI,
_run_finished_callback
+from airflow.models.taskinstance import TaskInstance, _run_finished_callback
from airflow.stats import Stats
from airflow.utils import timezone
-from airflow.utils.email import get_email_address_list, send_email
from airflow.utils.file import iter_airflow_imports, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter,
set_context
from airflow.utils.mixins import MultiprocessingStartMethodMixin
@@ -440,180 +435,6 @@ class DagFileProcessor(LoggingMixin):
self.dag_warnings: set[tuple[str, str]] = set()
self._last_num_of_db_queries = 0
- @classmethod
- @internal_api_call
- @provide_session
- def manage_slas(cls, dag_folder, dag_id: str, session: Session =
NEW_SESSION) -> None:
- """
- Find all tasks that have SLAs defined, and send alert emails when
needed.
-
- New SLA misses are also recorded in the database.
-
- We are assuming that the scheduler runs often, so we only check for
- tasks that should have succeeded in the past hour.
- """
- dagbag = DagFileProcessor._get_dagbag(dag_folder)
- dag = dagbag.get_dag(dag_id)
- cls.logger().info("Running SLA Checks for %s", dag.dag_id)
- if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
- cls.logger().info("Skipping SLA check for %s because no tasks in
DAG have SLAs", dag)
- return
- qry = (
- select(TI.task_id, func.max(DR.execution_date).label("max_ti"))
- .join(TI.dag_run)
- .where(TI.dag_id == dag.dag_id)
- .where(or_(TI.state == TaskInstanceState.SUCCESS, TI.state ==
TaskInstanceState.SKIPPED))
- .where(TI.task_id.in_(dag.task_ids))
- .group_by(TI.task_id)
- .subquery("sq")
- )
- # get recorded SlaMiss
- recorded_slas_query = set(
- session.execute(
- select(SlaMiss.dag_id, SlaMiss.task_id,
SlaMiss.execution_date).where(
- SlaMiss.dag_id == dag.dag_id,
SlaMiss.task_id.in_(dag.task_ids)
- )
- )
- )
- max_tis: Iterator[TI] = session.scalars(
- select(TI)
- .join(TI.dag_run)
- .where(TI.dag_id == dag.dag_id, TI.task_id == qry.c.task_id,
DR.execution_date == qry.c.max_ti)
- )
-
- ts = timezone.utcnow()
-
- for ti in max_tis:
- task = dag.get_task(ti.task_id)
- if not task.sla:
- continue
-
- if not isinstance(task.sla, timedelta):
- raise TypeError(
- f"SLA is expected to be timedelta object, got "
- f"{type(task.sla)} in {task.dag_id}:{task.task_id}"
- )
-
- sla_misses = []
- next_info =
dag.next_dagrun_info(dag.get_run_data_interval(ti.dag_run), restricted=False)
- while next_info and next_info.logical_date < ts:
- next_info = dag.next_dagrun_info(next_info.data_interval,
restricted=False)
-
- if next_info is None:
- break
- if (ti.dag_id, ti.task_id, next_info.logical_date) in
recorded_slas_query:
- continue
- if next_info.logical_date + task.sla < ts:
- sla_miss = SlaMiss(
- task_id=ti.task_id,
- dag_id=ti.dag_id,
- execution_date=next_info.logical_date,
- timestamp=ts,
- )
- sla_misses.append(sla_miss)
- Stats.incr("sla_missed", tags={"dag_id": ti.dag_id,
"task_id": ti.task_id})
- if sla_misses:
- session.add_all(sla_misses)
- session.commit()
- slas: list[SlaMiss] = session.scalars(
- select(SlaMiss).where(~SlaMiss.notification_sent, SlaMiss.dag_id
== dag.dag_id)
- ).all()
- if slas:
- sla_dates: list[datetime] = [sla.execution_date for sla in slas]
- fetched_tis: list[TI] = session.scalars(
- select(TI).where(
- TI.dag_id == dag.dag_id,
- TI.execution_date.in_(sla_dates),
- TI.state != TaskInstanceState.SUCCESS,
- )
- ).all()
- blocking_tis: list[TI] = []
- for ti in fetched_tis:
- if ti.task_id in dag.task_ids:
- ti.task = dag.get_task(ti.task_id)
- blocking_tis.append(ti)
- else:
- session.delete(ti)
- session.commit()
-
- task_list = "\n".join(sla.task_id + " on " +
sla.execution_date.isoformat() for sla in slas)
- blocking_task_list = "\n".join(
- ti.task_id + " on " + ti.execution_date.isoformat() for ti in
blocking_tis
- )
- # Track whether email or any alert notification sent
- # We consider email or the alert callback as notifications
- email_sent = False
- notification_sent = False
- if dag.sla_miss_callback:
- # Execute the alert callback
- callbacks = (
- dag.sla_miss_callback
- if isinstance(dag.sla_miss_callback, list)
- else [dag.sla_miss_callback]
- )
- for callback in callbacks:
- cls.logger().info("Calling SLA miss callback %s", callback)
- try:
- callback(dag, task_list, blocking_task_list, slas,
blocking_tis)
- notification_sent = True
- except Exception:
- Stats.incr(
- "sla_callback_notification_failure",
- tags={
- "dag_id": dag.dag_id,
- "func_name": callback.__name__,
- },
- )
- cls.logger().exception(
- "Could not call sla_miss_callback(%s) for DAG %s",
- callback.__name__,
- dag.dag_id,
- )
- email_content = f"""\
- Here's a list of tasks that missed their SLAs:
- <pre><code>{task_list}\n<code></pre>
- Blocking tasks:
- <pre><code>{blocking_task_list}<code></pre>
- Airflow Webserver URL: {conf.get(section='webserver',
key='base_url')}
- """
-
- tasks_missed_sla = []
- for sla in slas:
- try:
- task = dag.get_task(sla.task_id)
- except TaskNotFound:
- # task already deleted from DAG, skip it
- cls.logger().warning(
- "Task %s doesn't exist in DAG anymore, skipping SLA
miss notification.", sla.task_id
- )
- else:
- tasks_missed_sla.append(task)
-
- emails: set[str] = set()
- for task in tasks_missed_sla:
- if task.email:
- if isinstance(task.email, str):
- emails.update(get_email_address_list(task.email))
- elif isinstance(task.email, (list, tuple)):
- emails.update(task.email)
- if emails:
- try:
- send_email(emails, f"[airflow] SLA miss on
DAG={dag.dag_id}", email_content)
- email_sent = True
- notification_sent = True
- except Exception:
- Stats.incr("sla_email_notification_failure",
tags={"dag_id": dag.dag_id})
- cls.logger().exception(
- "Could not send SLA Miss email notification for DAG
%s", dag.dag_id
- )
- # If we sent any notification, update the sla_miss table
- if notification_sent:
- for sla in slas:
- sla.email_sent = email_sent
- sla.notification_sent = True
- session.merge(sla)
- session.commit()
-
@staticmethod
@internal_api_call
@provide_session
@@ -748,13 +569,6 @@ class DagFileProcessor(LoggingMixin):
try:
if isinstance(request, TaskCallbackRequest):
cls._execute_task_callbacks(dagbag, request,
unit_test_mode, session=session)
- elif isinstance(request, SlaCallbackRequest):
- if InternalApiConfig.get_use_internal_api():
- cls.logger().warning(
- "SlaCallbacks are not supported when the Internal
API is enabled"
- )
- else:
- DagFileProcessor.manage_slas(dagbag.dag_folder,
request.dag_id, session=session)
elif isinstance(request, DagCallbackRequest):
cls._execute_dag_callbacks(dagbag, request,
session=session)
except Exception:
diff --git a/airflow/example_dags/example_sla_dag.py
b/airflow/example_dags/example_sla_dag.py
deleted file mode 100644
index aca1277e88..0000000000
--- a/airflow/example_dags/example_sla_dag.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Example DAG demonstrating SLA use in Tasks"""
-
-from __future__ import annotations
-
-import datetime
-import time
-
-import pendulum
-
-from airflow.decorators import dag, task
-
-
-# [START howto_task_sla]
-def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
- print(
- "The callback arguments are: ",
- {
- "dag": dag,
- "task_list": task_list,
- "blocking_task_list": blocking_task_list,
- "slas": slas,
- "blocking_tis": blocking_tis,
- },
- )
-
-
-@dag(
- schedule="*/2 * * * *",
- start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- catchup=False,
- sla_miss_callback=sla_callback,
- default_args={"email": "[email protected]"},
-)
-def example_sla_dag():
- @task(sla=datetime.timedelta(seconds=10))
- def sleep_20():
- """Sleep for 20 seconds"""
- time.sleep(20)
-
- @task
- def sleep_30():
- """Sleep for 30 seconds"""
- time.sleep(30)
-
- sleep_20() >> sleep_30()
-
-
-example_dag = example_sla_dag()
-
-# [END howto_task_sla]
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index a49c2361ec..9438edd4d9 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -36,7 +36,7 @@ from sqlalchemy.orm import lazyload, load_only,
make_transient, selectinload
from sqlalchemy.sql import expression
from airflow import settings
-from airflow.callbacks.callback_requests import DagCallbackRequest,
SlaCallbackRequest, TaskCallbackRequest
+from airflow.callbacks.callback_requests import DagCallbackRequest,
TaskCallbackRequest
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.configuration import conf
from airflow.exceptions import UnknownExecutorException
@@ -1724,37 +1724,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return True
def _send_dag_callbacks_to_processor(self, dag: DAG, callback:
DagCallbackRequest | None = None) -> None:
- self._send_sla_callbacks_to_processor(dag)
if callback:
self.job.executor.send_callback(callback)
else:
self.log.debug("callback is empty")
- def _send_sla_callbacks_to_processor(self, dag: DAG) -> None:
- """Send SLA Callbacks to DagFileProcessor if tasks have SLAs set and
check_slas=True."""
- if not settings.CHECK_SLAS:
- return
-
- if not any(isinstance(task.sla, timedelta) for task in dag.tasks):
- self.log.debug("Skipping SLA check for %s because no tasks in DAG
have SLAs", dag)
- return
-
- if not dag.timetable.periodic:
- self.log.debug("Skipping SLA check for %s because DAG is not
scheduled", dag)
- return
-
- dag_model = DagModel.get_dagmodel(dag.dag_id)
- if not dag_model:
- self.log.error("Couldn't find DAG %s in database!", dag.dag_id)
- return
-
- request = SlaCallbackRequest(
- full_filepath=dag.fileloc,
- dag_id=dag.dag_id,
- processor_subdir=dag_model.processor_subdir,
- )
- self.job.executor.send_callback(request)
-
@provide_session
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) ->
None:
"""
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 8f95d1eee7..20656586ba 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -677,17 +677,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
way to limit concurrency for certain tasks
:param pool_slots: the number of pool slots this task should use (>= 1)
Values less than 1 are not allowed.
- :param sla: time by which the job is expected to succeed. Note that
- this represents the ``timedelta`` after the period is closed. For
- example if you set an SLA of 1 hour, the scheduler would send an email
- soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
- has not succeeded yet.
- The scheduler pays special attention for jobs with an SLA and
- sends alert
- emails for SLA misses. SLA misses are also recorded in the database
- for future reference. All tasks that share the same SLA time
- get bundled in a single email, sent soon after that time. SLA
- notification are sent once and only once for each task instance.
+ :param sla: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be
replaced with a new implementation in 3.1
:param execution_timeout: max time allowed for the execution of
this task instance, if it goes beyond it will raise and fail.
:param on_failure_callback: a function or list of functions to be called
when a task instance
@@ -975,7 +965,11 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
if self.pool_slots < 1:
dag_str = f" in dag {dag.dag_id}" if dag else ""
raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot
be less than 1")
- self.sla = sla
+
+ if sla:
+ self.log.warning(
+ "The SLA feature is removed in Airflow 3.0, to be replaced
with a new implementation in 3.1"
+ )
if not TriggerRule.is_valid(trigger_rule):
raise AirflowException(
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 215dae298f..91f8aec730 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -42,7 +42,6 @@ from typing import (
Container,
Iterable,
Iterator,
- List,
MutableSet,
Pattern,
Sequence,
@@ -146,7 +145,6 @@ if TYPE_CHECKING:
from airflow.decorators import TaskDecoratorCollection
from airflow.models.dagbag import DagBag
from airflow.models.operator import Operator
- from airflow.models.slamiss import SlaMiss
from airflow.serialization.pydantic.dag import DagModelPydantic
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.typing_compat import Literal
@@ -169,8 +167,6 @@ ScheduleArg = Union[
Collection[Union["Dataset", "DatasetAlias"]],
]
-SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"],
List[TaskInstance]], None]
-
class InconsistentDataInterval(AirflowException):
"""
@@ -430,10 +426,7 @@ class DAG(LoggingMixin):
beyond this the scheduler will disable the DAG
:param dagrun_timeout: Specify the duration a DagRun should be allowed to
run before it times out or
fails. Task instances that are running when a DagRun is timed out will
be marked as skipped.
- :param sla_miss_callback: specify a function or list of functions to call
when reporting SLA
- timeouts. See :ref:`sla_miss_callback<concepts:sla_miss_callback>` for
- more information about the function signature and parameters that are
- passed to the callback.
+ :param sla_miss_callback: DEPRECATED - The SLA feature is removed in
Airflow 3.0, to be replaced with a new implementation in 3.1
:param default_view: Specify DAG default view (grid, graph, duration,
gantt, landing_times),
default grid
:param orientation: Specify DAG orientation in graph view (LR, TB, RL,
BT), default LR
@@ -519,7 +512,7 @@ class DAG(LoggingMixin):
"core", "max_consecutive_failed_dag_runs_per_dag"
),
dagrun_timeout: timedelta | None = None,
- sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] =
None,
+ sla_miss_callback: Any = None,
default_view: str = airflow_conf.get_mandatory_value("webserver",
"dag_default_view").lower(),
orientation: str = airflow_conf.get_mandatory_value("webserver",
"dag_orientation"),
catchup: bool = airflow_conf.getboolean("scheduler",
"catchup_by_default"),
@@ -639,7 +632,10 @@ class DAG(LoggingMixin):
f"requires max_active_runs <=
{self.timetable.active_runs_limit}"
)
self.dagrun_timeout = dagrun_timeout
- self.sla_miss_callback = sla_miss_callback
+ if sla_miss_callback:
+ log.warning(
+ "The SLA feature is removed in Airflow 3.0, to be replaced
with a new implementation in 3.1"
+ )
if default_view in DEFAULT_VIEW_PRESETS:
self._default_view: str = default_view
else:
@@ -3297,7 +3293,7 @@ def dag(
"core", "max_consecutive_failed_dag_runs_per_dag"
),
dagrun_timeout: timedelta | None = None,
- sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
+ sla_miss_callback: Any = None,
default_view: str = airflow_conf.get_mandatory_value("webserver",
"dag_default_view").lower(),
orientation: str = airflow_conf.get_mandatory_value("webserver",
"dag_orientation"),
catchup: bool = airflow_conf.getboolean("scheduler", "catchup_by_default"),
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 2cb7d993fc..8a9e790ea7 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING, Any, ClassVar, Collection,
Iterable, Iterator,
import attr
import methodtools
-from airflow.exceptions import AirflowException, UnmappableOperator
+from airflow.exceptions import UnmappableOperator
from airflow.models.abstractoperator import (
DEFAULT_EXECUTOR,
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
@@ -328,11 +328,6 @@ class MappedOperator(AbstractOperator):
for k, v in self.partial_kwargs.items():
if k in self.template_fields:
XComArg.apply_upstream_relationship(self, v)
- if self.partial_kwargs.get("sla") is not None:
- raise AirflowException(
- f"SLAs are unsupported with mapped tasks. Please set
`sla=None` for task "
- f"{self.task_id!r}."
- )
@methodtools.lru_cache(maxsize=None)
@classmethod
@@ -547,14 +542,6 @@ class MappedOperator(AbstractOperator):
def weight_rule(self, value: str | PriorityWeightStrategy) -> None:
self.partial_kwargs["weight_rule"] =
validate_and_load_priority_weight_strategy(value)
- @property
- def sla(self) -> datetime.timedelta | None:
- return self.partial_kwargs.get("sla")
-
- @sla.setter
- def sla(self, value: datetime.timedelta | None) -> None:
- self.partial_kwargs["sla"] = value
-
@property
def max_active_tis_per_dag(self) -> int | None:
return self.partial_kwargs.get("max_active_tis_per_dag")
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index f216ce7316..49a3de3d77 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -71,6 +71,5 @@ class DagAttributeTypes(str, Enum):
ARG_NOT_SET = "arg_not_set"
TASK_CALLBACK_REQUEST = "task_callback_request"
DAG_CALLBACK_REQUEST = "dag_callback_request"
- SLA_CALLBACK_REQUEST = "sla_callback_request"
TASK_INSTANCE_KEY = "task_instance_key"
TRIGGER = "trigger"
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 12310685ec..c9c1f11835 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -34,7 +34,7 @@ from dateutil import relativedelta
from pendulum.tz.timezone import FixedTimezone, Timezone
from airflow import macros
-from airflow.callbacks.callback_requests import DagCallbackRequest,
SlaCallbackRequest, TaskCallbackRequest
+from airflow.callbacks.callback_requests import DagCallbackRequest,
TaskCallbackRequest
from airflow.compat.functools import cache
from airflow.datasets import (
BaseDataset,
@@ -758,8 +758,6 @@ class BaseSerialization:
return cls._encode(var.to_json(), type_=DAT.TASK_CALLBACK_REQUEST)
elif isinstance(var, DagCallbackRequest):
return cls._encode(var.to_json(), type_=DAT.DAG_CALLBACK_REQUEST)
- elif isinstance(var, SlaCallbackRequest):
- return cls._encode(var.to_json(), type_=DAT.SLA_CALLBACK_REQUEST)
elif var.__class__ == Context:
d = {}
for k, v in var._context.items():
@@ -890,8 +888,6 @@ class BaseSerialization:
return TaskCallbackRequest.from_json(var)
elif type_ == DAT.DAG_CALLBACK_REQUEST:
return DagCallbackRequest.from_json(var)
- elif type_ == DAT.SLA_CALLBACK_REQUEST:
- return SlaCallbackRequest.from_json(var)
elif type_ == DAT.TASK_INSTANCE_KEY:
return TaskInstanceKey(**var)
elif use_pydantic_models and _ENABLE_AIP_44:
@@ -1289,7 +1285,7 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
continue
elif k == "downstream_task_ids":
v = set(v)
- elif k in {"retry_delay", "execution_timeout", "sla",
"max_retry_delay"}:
+ elif k in {"retry_delay", "execution_timeout", "max_retry_delay"}:
v = cls._deserialize_timedelta(v)
elif k in encoded_op["template_fields"]:
pass
diff --git a/airflow/settings.py b/airflow/settings.py
index a242ce4da7..7a805f64a2 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -781,9 +781,6 @@ EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or
conf.getboolean(
ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler",
"allow_trigger_in_future", fallback=False)
-# Whether or not to check each dagrun against defined SLAs
-CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True)
-
USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule",
fallback=True)
# By default Airflow plugins are lazily-loaded (only loaded when required).
Set it to False,
diff --git
a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst
b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst
index c10b8cbae4..a52540fe78 100644
---
a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst
+++
b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst
@@ -23,10 +23,6 @@ Introduction
Chime notifier
(:class:`airflow.providers.amazon.aws.notifications.chime.ChimeNotifier`)
allows users to send
messages to a Chime chat room setup via a webhook using the various
``on_*_callbacks`` at both the DAG level and Task level
-You can also use a notifier with ``sla_miss_callback``.
-
-.. note::
- When notifiers are used with `sla_miss_callback` the context will contain
only values passed to the callback, refer
:ref:`sla_miss_callback<concepts:sla_miss_callback>`.
Example Code:
-------------
diff --git a/docs/apache-airflow-providers-amazon/notifications/sns.rst
b/docs/apache-airflow-providers-amazon/notifications/sns.rst
index 337e82cf62..bbaad4f814 100644
--- a/docs/apache-airflow-providers-amazon/notifications/sns.rst
+++ b/docs/apache-airflow-providers-amazon/notifications/sns.rst
@@ -25,11 +25,6 @@ Introduction
`Amazon SNS <https://aws.amazon.com/sns/>`__ notifier
:class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier`
allows users to push messages to a SNS Topic using the various
``on_*_callbacks`` at both the DAG level and Task level.
-You can also use a notifier with ``sla_miss_callback``.
-
-.. note::
- When notifiers are used with ``sla_miss_callback`` the context will
contain only values passed to the callback,
- refer :ref:`sla_miss_callback<concepts:sla_miss_callback>`.
Example Code:
-------------
diff --git a/docs/apache-airflow-providers-amazon/notifications/sqs.rst
b/docs/apache-airflow-providers-amazon/notifications/sqs.rst
index 4a2232b006..6951caa9fd 100644
--- a/docs/apache-airflow-providers-amazon/notifications/sqs.rst
+++ b/docs/apache-airflow-providers-amazon/notifications/sqs.rst
@@ -25,11 +25,6 @@ Introduction
`Amazon SQS <https://aws.amazon.com/sqs/>`__ notifier
:class:`~airflow.providers.amazon.aws.notifications.sqs.SqsNotifier`
allows users to push messages to an Amazon SQS Queue using the various
``on_*_callbacks`` at both the DAG level and Task level.
-You can also use a notifier with ``sla_miss_callback``.
-
-.. note::
- When notifiers are used with ``sla_miss_callback`` the context will
contain only values passed to the callback,
- refer :ref:`sla_miss_callback<concepts:sla_miss_callback>`.
Example Code:
-------------
diff --git
a/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst
b/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst
index d93d5a2fc5..d16f9b2b9e 100644
---
a/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst
+++
b/docs/apache-airflow-providers-pagerduty/notifications/pagerduty_notifier_howto_guide.rst
@@ -23,10 +23,6 @@ Introduction
The Pagerduty notifier
(:class:`airflow.providers.pagerduty.notifications.pagerduty.PagerdutyNotifier`)
allows users to send
messages to Pagerduty using the various ``on_*_callbacks`` at both the DAG
level and Task level.
-You can also use a notifier with ``sla_miss_callback``.
-
-.. note::
- When notifiers are used with `sla_miss_callback` the context will contain
only values passed to the callback, refer
:ref:`sla_miss_callback<concepts:sla_miss_callback>`.
Example Code:
-------------
diff --git
a/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst
b/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst
index d967779cee..a4f891f8a5 100644
---
a/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst
+++
b/docs/apache-airflow-providers-slack/notifications/slack_notifier_howto_guide.rst
@@ -23,10 +23,6 @@ Introduction
Slack notifier
(:class:`airflow.providers.slack.notifications.slack.SlackNotifier`) allows
users to send
messages to a slack channel using the various ``on_*_callbacks`` at both the
DAG level and Task level
-You can also use a notifier with ``sla_miss_callback``.
-
-.. note::
- When notifiers are used with `sla_miss_callback` the context will contain
only values passed to the callback, refer
:ref:`sla_miss_callback<concepts:sla_miss_callback>`.
Example Code:
-------------
diff --git
a/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst
b/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst
index bb9e85c674..66ced818a7 100644
---
a/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst
+++
b/docs/apache-airflow-providers-slack/notifications/slackwebhook_notifier_howto_guide.rst
@@ -24,10 +24,6 @@ Slack Incoming Webhook notifier
(:class:`airflow.providers.slack.notifications.s
allows users to send messages to a slack channel through `Incoming Webhook
<https://api.slack.com/messaging/webhooks>`__
using the various ``on_*_callbacks`` at both the DAG level and Task level
-You can also use a notifier with ``sla_miss_callback``.
-
-.. note::
- When notifiers are used with `sla_miss_callback` the context will contain
only values passed to the callback, refer
:ref:`sla_miss_callback<concepts:sla_miss_callback>`.
Example Code:
-------------
diff --git
a/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst
b/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst
index c7183c5e56..4cb1bf310e 100644
---
a/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst
+++
b/docs/apache-airflow-providers-smtp/notifications/smtp_notifier_howto_guide.rst
@@ -23,10 +23,6 @@ Introduction
The SMTP notifier
(:class:`airflow.providers.smtp.notifications.smtp.SmtpNotifier`) allows users
to send
messages to SMTP servers using the various ``on_*_callbacks`` at both the DAG
level and Task level.
-You can also use a notifier with ``sla_miss_callback``.
-
-.. note::
- When notifiers are used with `sla_miss_callback` the context will contain
only values passed to the callback, refer
:ref:`sla_miss_callback<concepts:sla_miss_callback>`.
Example Code:
-------------
diff --git
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst
index a70a876ba3..b54071373c 100644
---
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst
+++
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/callbacks.rst
@@ -46,7 +46,6 @@ Name Description
===========================================
================================================================
``on_success_callback`` Invoked when the task
:ref:`succeeds <concepts:task-instances>`
``on_failure_callback`` Invoked when the task :ref:`fails
<concepts:task-instances>`
-``sla_miss_callback`` Invoked when a task misses its
defined :ref:`SLA <concepts:slas>`
``on_retry_callback`` Invoked when the task is :ref:`up
for retry <concepts:task-instances>`
``on_execute_callback`` Invoked right before the task
begins executing.
``on_skipped_callback`` Invoked when the task is
:ref:`running <concepts:task-instances>` and AirflowSkipException raised.
diff --git
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
index c8522bee3b..61985cecea 100644
---
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
+++
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
@@ -164,7 +164,6 @@ Name
Descripti
Metric
with file_path and action tagging.
``dag_processing.processor_timeouts`` Number
of file processors that have been killed due to taking too long.
Metric
with file_path tagging.
-``dag_processing.sla_callback_count`` Number
of SLA callbacks received
``dag_processing.other_callback_count`` Number
of non-SLA callbacks received
``dag_processing.file_path_queue_update_count`` Number
of times we've scanned the filesystem and queued all existing dags
``dag_file_processor_timeouts``
(DEPRECATED) same behavior as ``dag_processing.processor_timeouts``
@@ -176,9 +175,6 @@ Name
Descripti
``scheduler.critical_section_busy`` Count
of times a scheduler process tried to get a lock on the critical
section
(needed to send tasks to the executor) and found it locked by
another
process.
-``sla_missed`` Number
of SLA misses. Metric with dag_id and task_id tagging.
-``sla_callback_notification_failure`` Number
of failed SLA miss callback notification attempts. Metric with dag_id and
func_name tagging.
-``sla_email_notification_failure`` Number
of failed SLA miss email notification attempts. Metric with dag_id tagging.
``ti.start.<dag_id>.<task_id>`` Number
of started task in a given dag. Similar to <job_name>_start but for task
``ti.start`` Number
of started task in a given dag. Similar to <job_name>_start but for task.
Metric
with dag_id and task_id tagging.
diff --git a/docs/apache-airflow/core-concepts/tasks.rst
b/docs/apache-airflow/core-concepts/tasks.rst
index 0e05f55bcf..ad03283ef7 100644
--- a/docs/apache-airflow/core-concepts/tasks.rst
+++ b/docs/apache-airflow/core-concepts/tasks.rst
@@ -149,82 +149,11 @@ is periodically executed and rescheduled until it
succeeds.
mode="reschedule",
)
-If you merely want to be notified if a task runs over but still let it run to
completion, you want :ref:`concepts:slas` instead.
-
-
-.. _concepts:slas:
SLAs
----
-An SLA, or a Service Level Agreement, is an expectation for the maximum time a
Task should be completed relative to the Dag Run start time. If a task takes
longer than this to run, it is then visible in the "SLA Misses" part of the
user interface, as well as going out in an email of all tasks that missed their
SLA.
-
-Tasks over their SLA are not cancelled, though - they are allowed to run to
completion. If you want to cancel a task after a certain runtime is reached,
you want :ref:`concepts:timeouts` instead.
-
-To set an SLA for a task, pass a ``datetime.timedelta`` object to the
Task/Operator's ``sla`` parameter. You can also supply an
``sla_miss_callback`` that will be called when the SLA is missed if you want to
run your own logic.
-
-If you want to disable SLA checking entirely, you can set ``check_slas =
False`` in Airflow's ``[core]`` configuration.
-
-To read more about configuring the emails, see :doc:`/howto/email-config`.
-
-.. note::
-
- Manually-triggered tasks and tasks in event-driven DAGs will not be
checked for an SLA miss. For more information on DAG ``schedule`` values see
:doc:`DAG Run <dag-run>`.
-
-.. _concepts:sla_miss_callback:
-
-sla_miss_callback
-~~~~~~~~~~~~~~~~~
-
-You can also supply an ``sla_miss_callback`` that will be called when the SLA
is missed if you want to run your own logic.
-The function signature of an ``sla_miss_callback`` requires 5 parameters.
-
-#. ``dag``
-
- * Parent :ref:`DAG <concepts-dags>` Object for the :doc:`DAGRun <dag-run>`
in which tasks missed their
- :ref:`SLA <concepts:slas>`.
-
-#. ``task_list``
-
- * String list (new-line separated, \\n) of all tasks that missed their
:ref:`SLA <concepts:slas>`
- since the last time that the ``sla_miss_callback`` ran.
-
-#. ``blocking_task_list``
-
- * Any task in the :doc:`DAGRun(s)<dag-run>` (with the same
``execution_date`` as a task that missed
- :ref:`SLA <concepts:slas>`) that is not in a **SUCCESS** state at the
time that the ``sla_miss_callback``
- runs. i.e. 'running', 'failed'. These tasks are described as tasks that
are blocking itself or another
- task from completing before its SLA window is complete.
-
-#. ``slas``
-
- * List of :py:mod:`SlaMiss<airflow.models.slamiss>` objects associated
with the tasks in the
- ``task_list`` parameter.
-
-#. ``blocking_tis``
-
- * List of the :ref:`TaskInstance <concepts:task-instances>` objects that
are associated with the tasks
- in the ``blocking_task_list`` parameter.
-
-Examples of ``sla_miss_callback`` function signature:
-
-.. code-block:: python
-
- def my_sla_miss_callback(dag, task_list, blocking_task_list, slas,
blocking_tis):
- ...
-
-.. code-block:: python
-
- def my_sla_miss_callback(*args):
- ...
-
-Example DAG:
-
-.. exampleinclude:: /../../airflow/example_dags/example_sla_dag.py
- :language: python
- :start-after: [START howto_task_sla]
- :end-before: [END howto_task_sla]
-
+The SLA feature from Airflow 2 has been removed in 3.0 and will be replaced
with a new implementation in Airflow 3.1
Special Exceptions
------------------
diff --git a/docs/conf.py b/docs/conf.py
index c87871e7ed..4d01e40219 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -755,7 +755,6 @@ autoapi_ignore = [
"*/node_modules/*",
"*/migrations/*",
"*/contrib/*",
- "**/example_sla_dag.py",
"**/example_taskflow_api_docker_virtualenv.py",
"**/example_dag_decorator.py",
]
diff --git a/newsfragments/42285.significant.rst
b/newsfragments/42285.significant.rst
new file mode 100644
index 0000000000..8f8cfa0dee
--- /dev/null
+++ b/newsfragments/42285.significant.rst
@@ -0,0 +1 @@
+The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts
in 3.1
diff --git a/tests/callbacks/test_callback_requests.py
b/tests/callbacks/test_callback_requests.py
index 6d900c8bd3..5992ee6fbb 100644
--- a/tests/callbacks/test_callback_requests.py
+++ b/tests/callbacks/test_callback_requests.py
@@ -23,7 +23,6 @@ import pytest
from airflow.callbacks.callback_requests import (
CallbackRequest,
DagCallbackRequest,
- SlaCallbackRequest,
TaskCallbackRequest,
)
from airflow.models.dag import DAG
@@ -55,14 +54,6 @@ class TestCallbackRequest:
),
DagCallbackRequest,
),
- (
- SlaCallbackRequest(
- full_filepath="filepath",
- dag_id="fake_dag",
- processor_subdir="/test_dir",
- ),
- SlaCallbackRequest,
- ),
],
)
def test_from_json(self, input, request_class):
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index 8112b7222a..1d3fefdf12 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -39,7 +39,7 @@ import pytest
import time_machine
from sqlalchemy import func
-from airflow.callbacks.callback_requests import CallbackRequest,
DagCallbackRequest, SlaCallbackRequest
+from airflow.callbacks.callback_requests import CallbackRequest,
DagCallbackRequest
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
from airflow.configuration import conf
from airflow.dag_processing.manager import (
@@ -1179,16 +1179,10 @@ class TestDagProcessorJobRunner:
processor_subdir=os.fspath(tmp_path),
run_id="456",
)
- callback3 = SlaCallbackRequest(
- dag_id="test_start_date_scheduling",
- full_filepath=str(dag_filepath),
- processor_subdir=os.fspath(tmp_path),
- )
with create_session() as session:
session.add(DbCallbackRequest(callback=callback1,
priority_weight=11))
session.add(DbCallbackRequest(callback=callback2,
priority_weight=10))
- session.add(DbCallbackRequest(callback=callback3,
priority_weight=9))
child_pipe, parent_pipe = multiprocessing.Pipe()
manager = DagProcessorJobRunner(
@@ -1371,16 +1365,6 @@ class TestDagProcessorJobRunner:
processor_subdir=tmp_path,
msg=None,
)
- dag1_sla1 = SlaCallbackRequest(
- full_filepath="/green_eggs/ham/file1.py",
- dag_id="dag1",
- processor_subdir=tmp_path,
- )
- dag1_sla2 = SlaCallbackRequest(
- full_filepath="/green_eggs/ham/file1.py",
- dag_id="dag1",
- processor_subdir=tmp_path,
- )
dag2_req1 = DagCallbackRequest(
full_filepath="/green_eggs/ham/file2.py",
@@ -1391,15 +1375,8 @@ class TestDagProcessorJobRunner:
msg=None,
)
- dag3_sla1 = SlaCallbackRequest(
- full_filepath="/green_eggs/ham/file3.py",
- dag_id="dag3",
- processor_subdir=tmp_path,
- )
-
# when
manager.processor._add_callback_to_queue(dag1_req1)
- manager.processor._add_callback_to_queue(dag1_sla1)
manager.processor._add_callback_to_queue(dag2_req1)
# then - requests should be in manager's queue, with dag2 ahead of
dag1 (because it was added last)
@@ -1408,18 +1385,10 @@ class TestDagProcessorJobRunner:
dag1_req1.full_filepath,
dag2_req1.full_filepath,
}
- assert manager.processor._callback_to_execute[dag1_req1.full_filepath]
== [dag1_req1, dag1_sla1]
assert manager.processor._callback_to_execute[dag2_req1.full_filepath]
== [dag2_req1]
- # when
- manager.processor._add_callback_to_queue(dag1_sla2)
- manager.processor._add_callback_to_queue(dag3_sla1)
-
- # then - since sla2 == sla1, should not have brought dag1 to the fore,
and an SLA on dag3 doesn't
# update the queue, although the callback is registered
assert manager.processor._file_path_queue ==
deque([dag2_req1.full_filepath, dag1_req1.full_filepath])
- assert manager.processor._callback_to_execute[dag1_req1.full_filepath]
== [dag1_req1, dag1_sla1]
- assert manager.processor._callback_to_execute[dag3_sla1.full_filepath]
== [dag3_sla1]
# when
manager.processor._add_callback_to_queue(dag1_req2)
@@ -1428,7 +1397,6 @@ class TestDagProcessorJobRunner:
assert manager.processor._file_path_queue ==
deque([dag1_req1.full_filepath, dag2_req1.full_filepath])
assert manager.processor._callback_to_execute[dag1_req1.full_filepath]
== [
dag1_req1,
- dag1_sla1,
dag1_req2,
]
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index 2b250ae8c5..d7b2b21166 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -32,10 +32,9 @@ from airflow.callbacks.callback_requests import
TaskCallbackRequest
from airflow.configuration import TEST_DAGS_FOLDER, conf
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessor,
DagFileProcessorProcess
-from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance
+from airflow.models import DagBag, DagModel, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
-from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
@@ -50,7 +49,6 @@ from tests.test_utils.db import (
clear_db_pools,
clear_db_runs,
clear_db_serialized_dags,
- clear_db_sla_miss,
)
from tests.test_utils.mock_executor import MockExecutor
@@ -89,7 +87,6 @@ class TestDagFileProcessor:
clear_db_runs()
clear_db_pools()
clear_db_dags()
- clear_db_sla_miss()
clear_db_import_errors()
clear_db_jobs()
clear_db_serialized_dags()
@@ -116,395 +113,6 @@ class TestDagFileProcessor:
dag_file_processor.process_file(file_path, [], False)
- @pytest.mark.skip_if_database_isolation_mode
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_sla_miss_callback(self, mock_get_dagbag,
create_dummy_dag, get_test_dag):
- """
- Test that the dag file processor calls the sla miss callback
- """
- session = settings.Session()
- sla_callback = MagicMock()
-
- # Create dag with a start of 1 day ago, but a sla of 0, so we'll
already have a sla_miss on the books.
- test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
- test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
- dag, task = create_dummy_dag(
- dag_id="test_sla_miss",
- task_id="dummy",
- sla_miss_callback=sla_callback,
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta()},
- )
-
- session.merge(
- TaskInstance(
- task=task,
- run_id=test_run_id,
- state=State.SUCCESS,
- )
- )
- session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss",
execution_date=test_start_date))
-
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
- session.commit()
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
-
- assert sla_callback.called
-
- @pytest.mark.skip_if_database_isolation_mode
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_sla_miss_callback_invalid_sla(self,
mock_get_dagbag, create_dummy_dag):
- """
- Test that the dag file processor does not call the sla miss callback
when
- given an invalid sla
- """
- session = settings.Session()
-
- sla_callback = MagicMock()
-
- # Create dag with a start of 1 day ago, but an sla of 0
- # so we'll already have an sla_miss on the books.
- # Pass anything besides a timedelta object to the sla argument.
- test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
- test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
- dag, task = create_dummy_dag(
- dag_id="test_sla_miss",
- task_id="dummy",
- sla_miss_callback=sla_callback,
- default_args={"start_date": test_start_date, "sla": None},
- )
-
- session.merge(TaskInstance(task=task, run_id=test_run_id,
state=State.SUCCESS))
- session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss",
execution_date=test_start_date))
-
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
- sla_callback.assert_not_called()
-
- @pytest.mark.skip_if_database_isolation_mode
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_sla_miss_callback_sent_notification(self,
mock_get_dagbag, create_dummy_dag):
- """
- Test that the dag file processor does not call the sla_miss_callback
when a
- notification has already been sent
- """
- session = settings.Session()
-
- # Mock the callback function so we can verify that it was not called
- sla_callback = MagicMock()
-
- # Create dag with a start of 2 days ago, but an sla of 1 day
- # ago so we'll already have an sla_miss on the books
- test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
- test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
- dag, task = create_dummy_dag(
- dag_id="test_sla_miss",
- task_id="dummy",
- sla_miss_callback=sla_callback,
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta(days=1)},
- )
-
- # Create a TaskInstance for two days ago
- session.merge(TaskInstance(task=task, run_id=test_run_id,
state=State.SUCCESS))
-
- # Create an SlaMiss where notification was sent, but email was not
- session.merge(
- SlaMiss(
- task_id="dummy",
- dag_id="test_sla_miss",
- execution_date=test_start_date,
- email_sent=False,
- notification_sent=True,
- )
- )
-
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- # Now call manage_slas and see if the sla_miss callback gets called
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
-
- sla_callback.assert_not_called()
-
- @pytest.mark.skip_if_database_isolation_mode
- @mock.patch("airflow.dag_processing.processor.Stats.incr")
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(
- self, mock_get_dagbag, mock_stats_incr, dag_maker
- ):
- """
- Test that the dag file processor does not try to insert already
existing item into the database
- """
- session = settings.Session()
-
- # Create dag with a start of 2 days ago, but an sla of 1 day
- # ago so we'll already have an sla_miss on the books
- test_start_date = timezone.utcnow() - datetime.timedelta(days=2)
- with dag_maker(
- dag_id="test_sla_miss",
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta(days=1)},
- ) as dag:
- task = EmptyOperator(task_id="dummy")
-
- dr = dag_maker.create_dagrun(execution_date=test_start_date,
state=State.SUCCESS)
-
- # Create a TaskInstance for two days ago
- ti = TaskInstance(task=task, run_id=dr.run_id, state=State.SUCCESS)
- session.merge(ti)
- session.flush()
-
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
- sla_miss_count = (
- session.query(SlaMiss)
- .filter(
- SlaMiss.dag_id == dag.dag_id,
- SlaMiss.task_id == task.task_id,
- )
- .count()
- )
- assert sla_miss_count == 1
- mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id":
"test_sla_miss", "task_id": "dummy"})
- # Now call manage_slas and see that it runs without errors
- # because of existing SlaMiss above.
- # Since this is run often, it's possible that it runs before another
- # ti is successful thereby trying to insert a duplicate record.
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
-
- @pytest.mark.skip_if_database_isolation_mode
- @mock.patch("airflow.dag_processing.processor.Stats.incr")
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def
test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_recording_missing_sla(
- self, mock_get_dagbag, mock_stats_incr, dag_maker
- ):
- """
- Test that the dag file processor continue checking subsequent task
instances
- even if the preceding task instance misses the sla ahead
- """
- session = settings.Session()
-
- # Create a dag with a start of 3 days ago and sla of 1 day,
- # so we have 2 missing slas
- now = timezone.utcnow()
- test_start_date = now - datetime.timedelta(days=3)
- # test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
- with dag_maker(
- dag_id="test_sla_miss",
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta(days=1)},
- ) as dag:
- task = EmptyOperator(task_id="dummy")
-
- dr = dag_maker.create_dagrun(execution_date=test_start_date,
state=State.SUCCESS)
-
- session.merge(TaskInstance(task=task, run_id=dr.run_id,
state="success"))
- session.merge(
- SlaMiss(task_id=task.task_id, dag_id=dag.dag_id,
execution_date=now - datetime.timedelta(days=2))
- )
- session.flush()
-
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
- sla_miss_count = (
- session.query(SlaMiss)
- .filter(
- SlaMiss.dag_id == dag.dag_id,
- SlaMiss.task_id == task.task_id,
- )
- .count()
- )
- assert sla_miss_count == 2
- mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id":
"test_sla_miss", "task_id": "dummy"})
-
- @pytest.mark.skip_if_database_isolation_mode
- @patch.object(DagFileProcessor, "logger")
- @mock.patch("airflow.dag_processing.processor.Stats.incr")
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_sla_miss_callback_exception(
- self,
- mock_get_dagbag,
- mock_stats_incr,
- mock_get_log,
- create_dummy_dag,
- ):
- """
- Test that the dag file processor gracefully logs an exception if there
is a problem
- calling the sla_miss_callback
- """
- session = settings.Session()
-
- sla_callback = MagicMock(
- __name__="function_name", side_effect=RuntimeError("Could not call
function")
- )
-
- test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
- test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
-
- for i, callback in enumerate([[sla_callback], sla_callback]):
- dag, task = create_dummy_dag(
- dag_id=f"test_sla_miss_{i}",
- task_id="dummy",
- sla_miss_callback=callback,
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta(hours=1)},
- )
- mock_stats_incr.reset_mock()
-
- session.merge(TaskInstance(task=task, run_id=test_run_id,
state=State.SUCCESS))
-
- # Create an SlaMiss where notification was sent, but email was not
- session.merge(
- SlaMiss(task_id="dummy", dag_id=f"test_sla_miss_{i}",
execution_date=test_start_date)
- )
-
- # Now call manage_slas and see if the sla_miss callback gets called
- mock_log = mock.Mock()
- mock_get_log.return_value = mock_log
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
- assert sla_callback.called
- mock_log.exception.assert_called_once_with(
- "Could not call sla_miss_callback(%s) for DAG %s",
- sla_callback.__name__,
- f"test_sla_miss_{i}",
- )
- mock_stats_incr.assert_called_once_with(
- "sla_callback_notification_failure",
- tags={"dag_id": f"test_sla_miss_{i}", "func_name":
sla_callback.__name__},
- )
-
- @pytest.mark.skip_if_database_isolation_mode
- @mock.patch("airflow.dag_processing.processor.send_email")
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(
- self, mock_get_dagbag, mock_send_email, create_dummy_dag
- ):
- session = settings.Session()
-
- test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
- test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
- email1 = "[email protected]"
- dag, task = create_dummy_dag(
- dag_id="test_sla_miss",
- task_id="sla_missed",
- email=email1,
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta(hours=1)},
- )
- session.merge(TaskInstance(task=task, run_id=test_run_id,
state=State.SUCCESS))
-
- email2 = "[email protected]"
- EmptyOperator(task_id="sla_not_missed", dag=dag, owner="airflow",
email=email2)
-
- session.merge(SlaMiss(task_id="sla_missed", dag_id="test_sla_miss",
execution_date=test_start_date))
-
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
-
- assert len(mock_send_email.call_args_list) == 1
-
- send_email_to = mock_send_email.call_args_list[0][0][0]
- assert email1 in send_email_to
- assert email2 not in send_email_to
-
- @pytest.mark.skip_if_database_isolation_mode
- @patch.object(DagFileProcessor, "logger")
- @mock.patch("airflow.dag_processing.processor.Stats.incr")
- @mock.patch("airflow.utils.email.send_email")
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_sla_miss_email_exception(
- self,
- mock_get_dagbag,
- mock_send_email,
- mock_stats_incr,
- mock_get_log,
- create_dummy_dag,
- ):
- """
- Test that the dag file processor gracefully logs an exception if there
is a problem
- sending an email
- """
- session = settings.Session()
- dag_id = "test_sla_miss"
- task_id = "test_ti"
- email = "[email protected]"
-
- # Mock the callback function so we can verify that it was not called
- mock_send_email.side_effect = RuntimeError("Could not send an email")
-
- test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
- test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
- dag, task = create_dummy_dag(
- dag_id=dag_id,
- task_id=task_id,
- email=email,
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta(hours=1)},
- )
- mock_stats_incr.reset_mock()
-
- session.merge(TaskInstance(task=task, run_id=test_run_id,
state=State.SUCCESS))
-
- # Create an SlaMiss where notification was sent, but email was not
- session.merge(SlaMiss(task_id=task_id, dag_id=dag_id,
execution_date=test_start_date))
-
- mock_log = mock.Mock()
- mock_get_log.return_value = mock_log
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id=dag_id,
session=session)
- mock_log.exception.assert_called_once_with(
- "Could not send SLA Miss email notification for DAG %s", dag_id
- )
-
mock_stats_incr.assert_called_once_with("sla_email_notification_failure",
tags={"dag_id": dag_id})
-
- @pytest.mark.skip_if_database_isolation_mode
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
- def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag,
create_dummy_dag):
- """
- Test that the dag file processor will not crash when trying to send
- sla miss notification for a deleted task
- """
- session = settings.Session()
-
- test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
- test_run_id = DagRunType.SCHEDULED.generate_run_id(test_start_date)
- dag, task = create_dummy_dag(
- dag_id="test_sla_miss",
- task_id="dummy",
- email="[email protected]",
- default_args={"start_date": test_start_date, "sla":
datetime.timedelta(hours=1)},
- )
-
- session.merge(TaskInstance(task=task, run_id=test_run_id,
state=State.SUCCESS))
-
- # Create an SlaMiss where notification was sent, but email was not
- session.merge(
- SlaMiss(task_id="dummy_deleted", dag_id="test_sla_miss",
execution_date=test_start_date)
- )
-
- mock_dagbag = mock.Mock()
- mock_dagbag.get_dag.return_value = dag
- mock_get_dagbag.return_value = mock_dagbag
-
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
-
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db
isolation mode
@patch.object(TaskInstance, "handle_failure")
def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 2292f0130e..52e9dbdeb1 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -36,7 +36,7 @@ from sqlalchemy import func, select, update
import airflow.example_dags
from airflow import settings
-from airflow.callbacks.callback_requests import DagCallbackRequest,
SlaCallbackRequest, TaskCallbackRequest
+from airflow.callbacks.callback_requests import DagCallbackRequest,
TaskCallbackRequest
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.dag_processing.manager import DagFileProcessorAgent
@@ -3987,82 +3987,6 @@ class TestSchedulerJob:
assert old_task_job.state == State.RUNNING
assert "Marked 1 SchedulerJob instances as failed" in caplog.messages
- def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker):
- """Test SLA Callbacks are not sent when check_slas is False"""
- dag_id = "test_send_sla_callbacks_to_processor_sla_disabled"
- with dag_maker(dag_id=dag_id, schedule="@daily") as dag:
- EmptyOperator(task_id="task1")
-
- with patch.object(settings, "CHECK_SLAS", False):
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
- scheduler_job.executor = MockExecutor()
- self.job_runner._send_sla_callbacks_to_processor(dag)
- scheduler_job.executor.callback_sink.send.assert_not_called()
-
- def test_send_sla_callbacks_to_processor_sla_no_task_slas(self, dag_maker):
- """Test SLA Callbacks are not sent when no task SLAs are defined"""
- dag_id = "test_send_sla_callbacks_to_processor_sla_no_task_slas"
- with dag_maker(dag_id=dag_id, schedule="@daily") as dag:
- EmptyOperator(task_id="task1")
-
- with patch.object(settings, "CHECK_SLAS", True):
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
- scheduler_job.executor = MockExecutor()
- self.job_runner._send_sla_callbacks_to_processor(dag)
- scheduler_job.executor.callback_sink.send.assert_not_called()
-
- @pytest.mark.parametrize(
- "schedule",
- [
- "@daily",
- "0 10 * * *",
- timedelta(hours=2),
- ],
- )
- def test_send_sla_callbacks_to_processor_sla_with_task_slas(self,
schedule, dag_maker):
- """Test SLA Callbacks are sent to the DAG Processor when SLAs are
defined on tasks"""
- dag_id = "test_send_sla_callbacks_to_processor_sla_with_task_slas"
- with dag_maker(
- dag_id=dag_id,
- schedule=schedule,
- processor_subdir=TEST_DAG_FOLDER,
- ) as dag:
- EmptyOperator(task_id="task1", sla=timedelta(seconds=60))
-
- with patch.object(settings, "CHECK_SLAS", True):
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
- scheduler_job.executor = MockExecutor()
- self.job_runner._send_sla_callbacks_to_processor(dag)
- expected_callback = SlaCallbackRequest(
- full_filepath=dag.fileloc,
- dag_id=dag.dag_id,
- processor_subdir=TEST_DAG_FOLDER,
- )
-
scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
-
- @pytest.mark.parametrize(
- "schedule",
- [
- None,
- [Dataset("foo")],
- ],
- )
- def test_send_sla_callbacks_to_processor_sla_dag_not_scheduled(self,
schedule, dag_maker):
- """Test SLA Callbacks are not sent when DAG isn't scheduled"""
- dag_id = "test_send_sla_callbacks_to_processor_sla_no_task_slas"
- with dag_maker(dag_id=dag_id, schedule=schedule) as dag:
- EmptyOperator(task_id="task1", sla=timedelta(seconds=5))
-
- with patch.object(settings, "CHECK_SLAS", True):
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
- scheduler_job.executor = MockExecutor()
- self.job_runner._send_sla_callbacks_to_processor(dag)
- scheduler_job.executor.callback_sink.send.assert_not_called()
-
@pytest.mark.parametrize(
"schedule, number_running, excepted",
[
diff --git a/tests/models/test_baseoperator.py
b/tests/models/test_baseoperator.py
index 2aa5b76b22..3c5b7634d5 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -304,51 +304,6 @@ class TestBaseOperator:
result = task.render_template(content, context)
assert result == expected_output
- def test_mapped_dag_slas_disabled_classic(self):
- class MyOp(BaseOperator):
- def __init__(self, x, **kwargs):
- self.x = x
- super().__init__(**kwargs)
-
- def execute(self, context):
- print(self.x)
-
- with DAG(
- dag_id="test-dag",
- schedule=None,
- start_date=DEFAULT_DATE,
- default_args={"sla": timedelta(minutes=30)},
- ) as dag:
-
- @dag.task
- def get_values():
- return [0, 1, 2]
-
- task1 = get_values()
- with pytest.raises(AirflowException, match="SLAs are unsupported
with mapped tasks"):
- MyOp.partial(task_id="hi").expand(x=task1)
-
- def test_mapped_dag_slas_disabled_taskflow(self):
- with DAG(
- dag_id="test-dag",
- schedule=None,
- start_date=DEFAULT_DATE,
- default_args={"sla": timedelta(minutes=30)},
- ) as dag:
-
- @dag.task
- def get_values():
- return [0, 1, 2]
-
- task1 = get_values()
-
- @dag.task
- def print_val(x):
- print(x)
-
- with pytest.raises(AirflowException, match="SLAs are unsupported
with mapped tasks"):
- print_val.expand(x=task1)
-
@pytest.mark.db_test
def test_render_template_fields(self):
"""Verify if operator attributes are correctly templated."""
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 7dfe57054c..758c7f496e 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -143,7 +143,6 @@ serialized_simple_dag_ground_truth = {
"retries": 1,
"retry_delay": {"__type": "timedelta", "__var": 300.0},
"max_retry_delay": {"__type": "timedelta", "__var": 600.0},
- "sla": {"__type": "timedelta", "__var": 100.0},
},
},
"start_date": 1564617600.0,
@@ -179,7 +178,6 @@ serialized_simple_dag_ground_truth = {
"retries": 1,
"retry_delay": 300.0,
"max_retry_delay": 600.0,
- "sla": 100.0,
"downstream_task_ids": [],
"_is_empty": False,
"ui_color": "#f0ede4",
@@ -218,7 +216,6 @@ serialized_simple_dag_ground_truth = {
"retries": 1,
"retry_delay": 300.0,
"max_retry_delay": 600.0,
- "sla": 100.0,
"downstream_task_ids": [],
"_is_empty": False,
"_operator_extra_links":
[{"tests.test_utils.mock_operators.CustomOpLink": {}}],
@@ -290,7 +287,6 @@ def make_simple_dag():
"retry_delay": timedelta(minutes=5),
"max_retry_delay": timedelta(minutes=10),
"depends_on_past": False,
- "sla": timedelta(seconds=100),
},
start_date=datetime(2019, 8, 1),
is_paused_upon_creation=False,
@@ -1299,7 +1295,6 @@ class TestStringifiedDAGs:
"retry_delay": timedelta(0, 300),
"retry_exponential_backoff": False,
"run_as_user": None,
- "sla": None,
"task_id": "10",
"trigger_rule": "all_success",
"wait_for_downstream": False,