This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 583caf8710f Make `sla` params no-op with deprecation warning (#48460)
583caf8710f is described below
commit 583caf8710f2820f5d65ec02174ec354720ad348
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Mar 28 01:33:48 2025 +0530
Make `sla` params no-op with deprecation warning (#48460)
This will ensure that existing 2.x dags with SLAs don't break.
---
.../unit/serialization/test_dag_serialization.py | 1 -
.../ci/pre_commit/check_init_decorator_arguments.py | 4 +++-
task-sdk/src/airflow/sdk/definitions/baseoperator.py | 20 +++++++-------------
task-sdk/src/airflow/sdk/definitions/dag.py | 15 ++++++++++++---
4 files changed, 22 insertions(+), 18 deletions(-)
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 4cfa53f7be0..c0cb347b4dc 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -1395,7 +1395,6 @@ class TestStringifiedDAGs:
"retry_delay": timedelta(0, 300),
"retry_exponential_backoff": False,
"run_as_user": None,
- "sla": None,
"start_date": None,
"start_from_trigger": False,
"start_trigger_args": None,
diff --git a/scripts/ci/pre_commit/check_init_decorator_arguments.py
b/scripts/ci/pre_commit/check_init_decorator_arguments.py
index 5eec4c4eb41..f3236c733ac 100755
--- a/scripts/ci/pre_commit/check_init_decorator_arguments.py
+++ b/scripts/ci/pre_commit/check_init_decorator_arguments.py
@@ -209,7 +209,9 @@ def check_dag_init_decorator_arguments() -> int:
items_to_check = [
(
"DAG",
- list(_find_cls_attrs(dag_mod, "DAG", ignore=["full_filepath",
"task_group"])),
+ list(
+ _find_cls_attrs(dag_mod, "DAG", ignore=["full_filepath",
"task_group", "sla_miss_callback"])
+ ),
"dag",
_find_dag_deco(dag_mod),
"dag_id",
diff --git a/task-sdk/src/airflow/sdk/definitions/baseoperator.py
b/task-sdk/src/airflow/sdk/definitions/baseoperator.py
index 4d81350bc42..629edce61a8 100644
--- a/task-sdk/src/airflow/sdk/definitions/baseoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/baseoperator.py
@@ -692,17 +692,8 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
way to limit concurrency for certain tasks
:param pool_slots: the number of pool slots this task should use (>= 1)
Values less than 1 are not allowed.
- :param sla: time by which the job is expected to succeed. Note that
- this represents the ``timedelta`` after the period is closed. For
- example if you set an SLA of 1 hour, the scheduler would send an email
- soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
- has not succeeded yet.
- The scheduler pays special attention for jobs with an SLA and
- sends alert
- emails for SLA misses. SLA misses are also recorded in the database
- for future reference. All tasks that share the same SLA time
- get bundled in a single email, sent soon after that time. SLA
- notification are sent once and only once for each task instance.
+ :param sla: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be
replaced with a
+ new implementation in Airflow >=3.1.
:param execution_timeout: max time allowed for the execution of
this task instance, if it goes beyond it will raise and fail.
:param on_failure_callback: a function or list of functions to be called
when a task instance
@@ -888,7 +879,6 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
"depends_on_past",
"wait_for_downstream",
"priority_weight",
- "sla",
"execution_timeout",
"on_execute_callback",
"on_failure_callback",
@@ -1087,7 +1077,11 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
if self.pool_slots < 1:
dag_str = f" in dag {dag.dag_id}" if dag else ""
raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot
be less than 1")
- self.sla = sla
+ if sla is not None:
+ warnings.warn(
+ "The SLA feature is removed in Airflow 3.0, to be replaced
with a new implementation in >=3.1",
+ stacklevel=2,
+ )
if not TriggerRule.is_valid(trigger_rule):
raise ValueError(
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 5fe2fb93c50..64a076346a0 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -23,6 +23,7 @@ import itertools
import logging
import os
import sys
+import warnings
import weakref
from collections import abc
from collections.abc import Collection, Iterable, MutableSet
@@ -406,7 +407,7 @@ class DAG:
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)),
)
- # sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None
+ sla_miss_callback: None = attrs.field(default=None)
catchup: bool = attrs.field(
factory=_config_bool_factory("scheduler", "catchup_by_default"),
)
@@ -552,6 +553,15 @@ class DAG:
def _has_on_failure_callback(self) -> bool:
return self.on_failure_callback is not None
+ @sla_miss_callback.validator
+ def _validate_sla_miss_callback(self, _, value):
+ if value is not None:
+ warnings.warn(
+ "The SLA feature is removed in Airflow 3.0, to be replaced
with a new implementation in >=3.1",
+ stacklevel=2,
+ )
+ return value
+
def __repr__(self):
return f"<DAG: {self.dag_id}>"
@@ -1030,7 +1040,7 @@ DAG._DAG__serialized_fields = frozenset(a.name for a in
attrs.fields(DAG)) - {
"_log",
"task_dict",
"template_searchpath",
- # "sla_miss_callback",
+ "sla_miss_callback",
"on_success_callback",
"on_failure_callback",
"template_undefined",
@@ -1063,7 +1073,6 @@ if TYPE_CHECKING:
max_active_runs: int = ...,
max_consecutive_failed_dag_runs: int = ...,
dagrun_timeout: timedelta | None = None,
- # sla_miss_callback: Any = None,
catchup: bool = ...,
on_success_callback: None | DagStateChangeCallback |
list[DagStateChangeCallback] = None,
on_failure_callback: None | DagStateChangeCallback |
list[DagStateChangeCallback] = None,