This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 583caf8710f Make `sla` params no-op with deprecation warning (#48460)
583caf8710f is described below

commit 583caf8710f2820f5d65ec02174ec354720ad348
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Mar 28 01:33:48 2025 +0530

    Make `sla` params no-op with deprecation warning (#48460)
    
    This will ensure that existing 2.x dags with SLAs don't break.
---
 .../unit/serialization/test_dag_serialization.py     |  1 -
 .../ci/pre_commit/check_init_decorator_arguments.py  |  4 +++-
 task-sdk/src/airflow/sdk/definitions/baseoperator.py | 20 +++++++-------------
 task-sdk/src/airflow/sdk/definitions/dag.py          | 15 ++++++++++++---
 4 files changed, 22 insertions(+), 18 deletions(-)

diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 4cfa53f7be0..c0cb347b4dc 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -1395,7 +1395,6 @@ class TestStringifiedDAGs:
             "retry_delay": timedelta(0, 300),
             "retry_exponential_backoff": False,
             "run_as_user": None,
-            "sla": None,
             "start_date": None,
             "start_from_trigger": False,
             "start_trigger_args": None,
diff --git a/scripts/ci/pre_commit/check_init_decorator_arguments.py 
b/scripts/ci/pre_commit/check_init_decorator_arguments.py
index 5eec4c4eb41..f3236c733ac 100755
--- a/scripts/ci/pre_commit/check_init_decorator_arguments.py
+++ b/scripts/ci/pre_commit/check_init_decorator_arguments.py
@@ -209,7 +209,9 @@ def check_dag_init_decorator_arguments() -> int:
     items_to_check = [
         (
             "DAG",
-            list(_find_cls_attrs(dag_mod, "DAG", ignore=["full_filepath", 
"task_group"])),
+            list(
+                _find_cls_attrs(dag_mod, "DAG", ignore=["full_filepath", 
"task_group", "sla_miss_callback"])
+            ),
             "dag",
             _find_dag_deco(dag_mod),
             "dag_id",
diff --git a/task-sdk/src/airflow/sdk/definitions/baseoperator.py 
b/task-sdk/src/airflow/sdk/definitions/baseoperator.py
index 4d81350bc42..629edce61a8 100644
--- a/task-sdk/src/airflow/sdk/definitions/baseoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/baseoperator.py
@@ -692,17 +692,8 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         way to limit concurrency for certain tasks
     :param pool_slots: the number of pool slots this task should use (>= 1)
         Values less than 1 are not allowed.
-    :param sla: time by which the job is expected to succeed. Note that
-        this represents the ``timedelta`` after the period is closed. For
-        example if you set an SLA of 1 hour, the scheduler would send an email
-        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
-        has not succeeded yet.
-        The scheduler pays special attention for jobs with an SLA and
-        sends alert
-        emails for SLA misses. SLA misses are also recorded in the database
-        for future reference. All tasks that share the same SLA time
-        get bundled in a single email, sent soon after that time. SLA
-        notification are sent once and only once for each task instance.
+    :param sla: DEPRECATED - The SLA feature is removed in Airflow 3.0, to be 
replaced with a
+        new implementation in Airflow >=3.1.
     :param execution_timeout: max time allowed for the execution of
         this task instance, if it goes beyond it will raise and fail.
     :param on_failure_callback: a function or list of functions to be called 
when a task instance
@@ -888,7 +879,6 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         "depends_on_past",
         "wait_for_downstream",
         "priority_weight",
-        "sla",
         "execution_timeout",
         "on_execute_callback",
         "on_failure_callback",
@@ -1087,7 +1077,11 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         if self.pool_slots < 1:
             dag_str = f" in dag {dag.dag_id}" if dag else ""
             raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot 
be less than 1")
-        self.sla = sla
+        if sla is not None:
+            warnings.warn(
+                "The SLA feature is removed in Airflow 3.0, to be replaced 
with a new implementation in >=3.1",
+                stacklevel=2,
+            )
 
         if not TriggerRule.is_valid(trigger_rule):
             raise ValueError(
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py 
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 5fe2fb93c50..64a076346a0 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -23,6 +23,7 @@ import itertools
 import logging
 import os
 import sys
+import warnings
 import weakref
 from collections import abc
 from collections.abc import Collection, Iterable, MutableSet
@@ -406,7 +407,7 @@ class DAG:
         default=None,
         
validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)),
     )
-    # sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None
+    sla_miss_callback: None = attrs.field(default=None)
     catchup: bool = attrs.field(
         factory=_config_bool_factory("scheduler", "catchup_by_default"),
     )
@@ -552,6 +553,15 @@ class DAG:
     def _has_on_failure_callback(self) -> bool:
         return self.on_failure_callback is not None
 
+    @sla_miss_callback.validator
+    def _validate_sla_miss_callback(self, _, value):
+        if value is not None:
+            warnings.warn(
+                "The SLA feature is removed in Airflow 3.0, to be replaced 
with a new implementation in >=3.1",
+                stacklevel=2,
+            )
+        return value
+
     def __repr__(self):
         return f"<DAG: {self.dag_id}>"
 
@@ -1030,7 +1040,7 @@ DAG._DAG__serialized_fields = frozenset(a.name for a in 
attrs.fields(DAG)) - {
     "_log",
     "task_dict",
     "template_searchpath",
-    # "sla_miss_callback",
+    "sla_miss_callback",
     "on_success_callback",
     "on_failure_callback",
     "template_undefined",
@@ -1063,7 +1073,6 @@ if TYPE_CHECKING:
         max_active_runs: int = ...,
         max_consecutive_failed_dag_runs: int = ...,
         dagrun_timeout: timedelta | None = None,
-        # sla_miss_callback: Any = None,
         catchup: bool = ...,
         on_success_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,
         on_failure_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,

Reply via email to