This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a76af4f22c0 Rename fail stop dag property to fail fast (#45327)
a76af4f22c0 is described below

commit a76af4f22c0f34813ec51f00cd0e2a4909c77cbf
Author: hprassad <[email protected]>
AuthorDate: Tue Jan 21 07:34:41 2025 +0530

    Rename fail stop dag property to fail fast (#45327)
    
    * Renamed fail_stop DAG property to fail_fast (#45229)
    
    * made changes to missed occurences (#45229)
    
    * Renamed fail_stop DAG property to fail_fast (#45229)
    
    * Added newsfragments for the PR #45327
    
    * docs(newsfragment): update change type and migraiton rule
    
    * ci(github-actions): fix ci typo
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Amogh Desai <[email protected]>
---
 .github/workflows/news-fragment.yml         |  4 ++--
 airflow/exceptions.py                       | 12 ++++++------
 airflow/models/dag.py                       |  6 +++---
 airflow/models/taskinstance.py              | 20 ++++++++++----------
 newsfragments/45327.significant.rst         | 23 +++++++++++++++++++++++
 task_sdk/src/airflow/sdk/definitions/dag.py | 15 +++++++--------
 tests/models/test_baseoperator.py           | 18 +++++++++---------
 tests/models/test_dag.py                    | 18 +++++++++---------
 tests/models/test_mappedoperator.py         | 18 +++++++++---------
 tests/models/test_taskinstance.py           |  6 +++---
 10 files changed, 81 insertions(+), 59 deletions(-)

diff --git a/.github/workflows/news-fragment.yml 
b/.github/workflows/news-fragment.yml
index 46cb294d7a5..979a1148a6e 100644
--- a/.github/workflows/news-fragment.yml
+++ b/.github/workflows/news-fragment.yml
@@ -61,13 +61,13 @@ jobs:
           BASE_REF: ${{ github.base_ref }}
         run: >
           change_types=(
-            'DAG changes'
+            'Dag changes'
             'Config changes'
             'API changes'
             'CLI changes'
             'Behaviour changes'
             'Plugin changes'
-            'Dependency change'
+            'Dependency changes'
           )
           news_fragment_content=`git diff origin/${BASE_REF} 
newsfragments/*.significant.rst`
 
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index fd4fbf6758f..89358e76bd0 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -289,23 +289,23 @@ class DagFileExists(AirflowBadRequest):
         warnings.warn("DagFileExists is deprecated and will be removed.", 
DeprecationWarning, stacklevel=2)
 
 
-class FailStopDagInvalidTriggerRule(AirflowException):
-    """Raise when a dag has 'fail_stop' enabled yet has a non-default trigger 
rule."""
+class FailFastDagInvalidTriggerRule(AirflowException):
+    """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger 
rule."""
 
     _allowed_rules = (TriggerRule.ALL_SUCCESS, 
TriggerRule.ALL_DONE_SETUP_SUCCESS)
 
     @classmethod
-    def check(cls, *, fail_stop: bool, trigger_rule: TriggerRule):
+    def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
         """
-        Check that fail_stop dag tasks have allowable trigger rules.
+        Check that fail_fast dag tasks have allowable trigger rules.
 
         :meta private:
         """
-        if fail_stop and trigger_rule not in cls._allowed_rules:
+        if fail_fast and trigger_rule not in cls._allowed_rules:
             raise cls()
 
     def __str__(self) -> str:
-        return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} 
trigger rule"
+        return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS} 
trigger rule"
 
 
 class DuplicateTaskIdFound(AirflowException):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 770552b1b3b..7da56bca624 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -419,9 +419,9 @@ class DAG(TaskSDKDag, LoggingMixin):
         Can be used as an HTTP link (for example the link to your Slack 
channel), or a mailto link.
         e.g: {"dag_owner": "https://airflow.apache.org/"}
     :param auto_register: Automatically register this DAG when it is used in a 
``with`` block
-    :param fail_stop: Fails currently running tasks when task in DAG fails.
-        **Warning**: A fail stop dag can only have tasks with the default 
trigger rule ("all_success").
-        An exception will be thrown if any task in a fail stop dag has a non 
default trigger rule.
+    :param fail_fast: Fails currently running tasks when task in DAG fails.
+        **Warning**: A fail fast dag can only have tasks with the default 
trigger rule ("all_success").
+        An exception will be thrown if any task in a fail fast dag has a non 
default trigger rule.
     :param dag_display_name: The display name of the DAG which appears on the 
UI.
     """
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5e0f4001d2d..5a605f40cd5 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -409,10 +409,10 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, 
session: Session):
         task = task_instance.task.dag.task_dict[ti.task_id]
         if not task.is_teardown:
             if ti.state == TaskInstanceState.RUNNING:
-                log.info("Forcing task %s to fail due to dag's `fail_stop` 
setting", ti.task_id)
+                log.info("Forcing task %s to fail due to dag's `fail_fast` 
setting", ti.task_id)
                 ti.error(session)
             else:
-                log.info("Setting task %s to SKIPPED due to dag's `fail_stop` 
setting.", ti.task_id)
+                log.info("Setting task %s to SKIPPED due to dag's `fail_fast` 
setting.", ti.task_id)
                 ti.set_state(state=TaskInstanceState.SKIPPED, session=session)
         else:
             log.info("Not skipping teardown task '%s'", ti.task_id)
@@ -1083,7 +1083,7 @@ def _handle_failure(
     test_mode: bool | None = None,
     context: Context | None = None,
     force_fail: bool = False,
-    fail_stop: bool = False,
+    fail_fast: bool = False,
 ) -> None:
     """
     Handle Failure for a task instance.
@@ -1106,7 +1106,7 @@ def _handle_failure(
         context=context,
         force_fail=force_fail,
         session=session,
-        fail_stop=fail_stop,
+        fail_fast=fail_fast,
     )
 
     _log_state(task_instance=task_instance, lead_msg="Immediate failure 
requested. " if force_fail else "")
@@ -3065,7 +3065,7 @@ class TaskInstance(Base, LoggingMixin):
         force_fail: bool = False,
         *,
         session: Session,
-        fail_stop: bool = False,
+        fail_fast: bool = False,
     ):
         """
         Fetch the context needed to handle a failure.
@@ -3076,7 +3076,7 @@ class TaskInstance(Base, LoggingMixin):
         :param context: Jinja2 context
         :param force_fail: if True, task does not retry
         :param session: SQLAlchemy ORM Session
-        :param fail_stop: if True, fail all downstream tasks
+        :param fail_fast: if True, fail all downstream tasks
         """
         if error:
             if isinstance(error, BaseException):
@@ -3133,7 +3133,7 @@ class TaskInstance(Base, LoggingMixin):
             email_for_state = operator.attrgetter("email_on_failure")
             callbacks = task.on_failure_callback if task else None
 
-            if task and fail_stop:
+            if task and fail_fast:
                 _stop_remaining_tasks(task_instance=ti, session=session)
         else:
             if ti.state == TaskInstanceState.RUNNING:
@@ -3190,9 +3190,9 @@ class TaskInstance(Base, LoggingMixin):
             assert self.task
             assert self.task.dag
         try:
-            fail_stop = self.task.dag.fail_stop
+            fail_fast = self.task.dag.fail_fast
         except Exception:
-            fail_stop = False
+            fail_fast = False
         _handle_failure(
             task_instance=self,
             error=error,
@@ -3200,7 +3200,7 @@ class TaskInstance(Base, LoggingMixin):
             test_mode=test_mode,
             context=context,
             force_fail=force_fail,
-            fail_stop=fail_stop,
+            fail_fast=fail_fast,
         )
 
     def is_eligible_to_retry(self):
diff --git a/newsfragments/45327.significant.rst 
b/newsfragments/45327.significant.rst
new file mode 100644
index 00000000000..db385eb7059
--- /dev/null
+++ b/newsfragments/45327.significant.rst
@@ -0,0 +1,23 @@
+Renamed DAG argument ``fail_stop`` to ``fail_fast`` across the codebase to 
align with Airflow 3.0.
+
+
+* Types of change
+
+  * [x] Dag changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [ ] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [ ] Code interface changes
+
+* Migration rules needed
+
+  * ruff
+
+    * AIR302
+
+      * arguments in ``DAG``
+
+          * [ ] ``fail_stop`` → ``fail_fast``
diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py 
b/task_sdk/src/airflow/sdk/definitions/dag.py
index 016de8d3136..f4b71ec9958 100644
--- a/task_sdk/src/airflow/sdk/definitions/dag.py
+++ b/task_sdk/src/airflow/sdk/definitions/dag.py
@@ -47,7 +47,7 @@ from dateutil.relativedelta import relativedelta
 from airflow import settings
 from airflow.exceptions import (
     DuplicateTaskIdFound,
-    FailStopDagInvalidTriggerRule,
+    FailFastDagInvalidTriggerRule,
     ParamValidationError,
     TaskNotFound,
 )
@@ -346,7 +346,7 @@ class DAG:
         Can be used as an HTTP link (for example the link to your Slack 
channel), or a mailto link.
         e.g: {"dag_owner": "https://airflow.apache.org/"}
     :param auto_register: Automatically register this DAG when it is used in a 
``with`` block
-    :param fail_stop: Fails currently running tasks when task in DAG fails.
+    :param fail_fast: Fails currently running tasks when task in DAG fails.
         **Warning**: A fail stop dag can only have tasks with the default 
trigger rule ("all_success").
         An exception will be thrown if any task in a fail stop dag has a non 
default trigger rule.
     :param dag_display_name: The display name of the DAG which appears on the 
UI.
@@ -413,7 +413,7 @@ class DAG:
     tags: MutableSet[str] = attrs.field(factory=set, converter=_convert_tags)
     owner_links: dict[str, str] = attrs.field(factory=dict)
     auto_register: bool = attrs.field(default=True, converter=bool)
-    fail_stop: bool = attrs.field(default=False, converter=bool)
+    fail_fast: bool = attrs.field(default=False, converter=bool)
     dag_display_name: str = attrs.field(
         default=attrs.Factory(_default_dag_display_name, takes_self=True),
         validator=attrs.validators.instance_of(str),
@@ -497,8 +497,7 @@ class DAG:
         elif isinstance(schedule, Collection) and not isinstance(schedule, 
str):
             if not all(isinstance(x, BaseAsset) for x in schedule):
                 raise ValueError(
-                    "All elements in 'schedule' should be either assets, "
-                    "asset references, or asset aliases"
+                    "All elements in 'schedule' should be either assets, asset 
references, or asset aliases"
                 )
             return AssetTriggeredTimetable(AssetAll(*schedule))
         else:
@@ -928,7 +927,7 @@ class DAG:
             # Add task_id to used_group_ids to prevent group_id and task_id 
collisions.
             self.task_group.used_group_ids.add(task_id)
 
-        FailStopDagInvalidTriggerRule.check(fail_stop=self.fail_stop, 
trigger_rule=task.trigger_rule)
+        FailFastDagInvalidTriggerRule.check(fail_fast=self.fail_fast, 
trigger_rule=task.trigger_rule)
 
     def add_tasks(self, tasks: Iterable[Operator]) -> None:
         """
@@ -1022,7 +1021,7 @@ DAG._DAG__serialized_fields = frozenset(a.name for a in 
attrs.fields(DAG)) - {
     "has_on_success_callback",
     "has_on_failure_callback",
     "auto_register",
-    "fail_stop",
+    "fail_fast",
     "schedule",
 }
 
@@ -1058,7 +1057,7 @@ if TYPE_CHECKING:
         tags: Collection[str] | None = None,
         owner_links: dict[str, str] | None = None,
         auto_register: bool = True,
-        fail_stop: bool = False,
+        fail_fast: bool = False,
         dag_display_name: str | None = None,
     ) -> Callable[[Callable], Callable[..., DAG]]:
         """
diff --git a/tests/models/test_baseoperator.py 
b/tests/models/test_baseoperator.py
index bc601099744..0ff8896746a 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -71,24 +71,24 @@ class TestBaseOperator:
     def test_trigger_rule_validation(self):
         from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE
 
-        fail_stop_dag = DAG(
+        fail_fast_dag = DAG(
             dag_id="test_dag_trigger_rule_validation",
             schedule=None,
             start_date=DEFAULT_DATE,
-            fail_stop=True,
+            fail_fast=True,
         )
-        non_fail_stop_dag = DAG(
+        non_fail_fast_dag = DAG(
             dag_id="test_dag_trigger_rule_validation",
             schedule=None,
             start_date=DEFAULT_DATE,
-            fail_stop=False,
+            fail_fast=False,
         )
 
         # An operator with default trigger rule and a fail-stop dag should be 
allowed
-        BaseOperator(task_id="test_valid_trigger_rule", dag=fail_stop_dag, 
trigger_rule=DEFAULT_TRIGGER_RULE)
+        BaseOperator(task_id="test_valid_trigger_rule", dag=fail_fast_dag, 
trigger_rule=DEFAULT_TRIGGER_RULE)
         # An operator with non default trigger rule and a non fail-stop dag 
should be allowed
         BaseOperator(
-            task_id="test_valid_trigger_rule", dag=non_fail_stop_dag, 
trigger_rule=TriggerRule.ALWAYS
+            task_id="test_valid_trigger_rule", dag=non_fail_fast_dag, 
trigger_rule=TriggerRule.ALWAYS
         )
 
     def test_cross_downstream(self):
@@ -454,13 +454,13 @@ def get_states(dr):
 
 
 @pytest.mark.db_test
-def test_teardown_and_fail_stop(dag_maker):
+def test_teardown_and_fail_fast(dag_maker):
     """
-    when fail_stop enabled, teardowns should run according to their setups.
+    when fail_fast enabled, teardowns should run according to their setups.
     in this case, the second teardown skips because its setup skips.
     """
 
-    with dag_maker(fail_stop=True) as dag:
+    with dag_maker(fail_fast=True) as dag:
         for num in (1, 2):
             with TaskGroup(f"tg_{num}"):
 
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 38f2c26c1ff..fb3519b7089 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1428,40 +1428,40 @@ class TestDag:
 
     def test_dag_add_task_checks_trigger_rule(self):
         # A non fail stop dag should allow any trigger rule
-        from airflow.exceptions import FailStopDagInvalidTriggerRule
+        from airflow.exceptions import FailFastDagInvalidTriggerRule
         from airflow.utils.trigger_rule import TriggerRule
 
         task_with_non_default_trigger_rule = EmptyOperator(
             task_id="task_with_non_default_trigger_rule", 
trigger_rule=TriggerRule.ALWAYS
         )
-        non_fail_stop_dag = DAG(
+        non_fail_fast_dag = DAG(
             dag_id="test_dag_add_task_checks_trigger_rule",
             schedule=None,
             start_date=DEFAULT_DATE,
-            fail_stop=False,
+            fail_fast=False,
         )
-        non_fail_stop_dag.add_task(task_with_non_default_trigger_rule)
+        non_fail_fast_dag.add_task(task_with_non_default_trigger_rule)
 
         # a fail stop dag should allow default trigger rule
         from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE
 
-        fail_stop_dag = DAG(
+        fail_fast_dag = DAG(
             dag_id="test_dag_add_task_checks_trigger_rule",
             schedule=None,
             start_date=DEFAULT_DATE,
-            fail_stop=True,
+            fail_fast=True,
         )
         task_with_default_trigger_rule = EmptyOperator(
             task_id="task_with_default_trigger_rule", 
trigger_rule=DEFAULT_TRIGGER_RULE
         )
-        fail_stop_dag.add_task(task_with_default_trigger_rule)
+        fail_fast_dag.add_task(task_with_default_trigger_rule)
 
         # a fail stop dag should not allow a non-default trigger rule
         task_with_non_default_trigger_rule = EmptyOperator(
             task_id="task_with_non_default_trigger_rule", 
trigger_rule=TriggerRule.ALWAYS
         )
-        with pytest.raises(FailStopDagInvalidTriggerRule):
-            fail_stop_dag.add_task(task_with_non_default_trigger_rule)
+        with pytest.raises(FailFastDagInvalidTriggerRule):
+            fail_fast_dag.add_task(task_with_non_default_trigger_rule)
 
     def test_dag_add_task_sets_default_task_group(self):
         dag = DAG(dag_id="test_dag_add_task_sets_default_task_group", 
schedule=None, start_date=DEFAULT_DATE)
diff --git a/tests/models/test_mappedoperator.py 
b/tests/models/test_mappedoperator.py
index e683a00b963..ff107d8204d 100644
--- a/tests/models/test_mappedoperator.py
+++ b/tests/models/test_mappedoperator.py
@@ -1580,12 +1580,12 @@ class TestMappedSetupTeardown:
         }
         assert states == expected
 
-    def test_one_to_many_with_teardown_and_fail_stop(self, dag_maker):
+    def test_one_to_many_with_teardown_and_fail_fast(self, dag_maker):
         """
-        With fail_stop enabled, the teardown for an already-completed setup
+        With fail_fast enabled, the teardown for an already-completed setup
         should not be skipped.
         """
-        with dag_maker(fail_stop=True) as dag:
+        with dag_maker(fail_fast=True) as dag:
 
             @task
             def my_setup():
@@ -1616,12 +1616,12 @@ class TestMappedSetupTeardown:
         }
         assert states == expected
 
-    def test_one_to_many_with_teardown_and_fail_stop_more_tasks(self, 
dag_maker):
+    def test_one_to_many_with_teardown_and_fail_fast_more_tasks(self, 
dag_maker):
         """
-        when fail_stop enabled, teardowns should run according to their setups.
+        when fail_fast enabled, teardowns should run according to their setups.
         in this case, the second teardown skips because its setup skips.
         """
-        with dag_maker(fail_stop=True) as dag:
+        with dag_maker(fail_fast=True) as dag:
             for num in (1, 2):
                 with TaskGroup(f"tg_{num}"):
 
@@ -1658,12 +1658,12 @@ class TestMappedSetupTeardown:
         }
         assert states == expected
 
-    def 
test_one_to_many_with_teardown_and_fail_stop_more_tasks_mapped_setup(self, 
dag_maker):
+    def 
test_one_to_many_with_teardown_and_fail_fast_more_tasks_mapped_setup(self, 
dag_maker):
         """
-        when fail_stop enabled, teardowns should run according to their setups.
+        when fail_fast enabled, teardowns should run according to their setups.
         in this case, the second teardown skips because its setup skips.
         """
-        with dag_maker(fail_stop=True) as dag:
+        with dag_maker(fail_fast=True) as dag:
             for num in (1, 2):
                 with TaskGroup(f"tg_{num}"):
 
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 29506c66761..fca20b4bed0 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3649,19 +3649,19 @@ class TestTaskInstance:
         ti.handle_failure("test ti.task undefined")
 
     @provide_session
-    def test_handle_failure_fail_stop(self, create_dummy_dag, session=None):
+    def test_handle_failure_fail_fast(self, create_dummy_dag, session=None):
         start_date = timezone.datetime(2016, 6, 1)
         clear_db_runs()
 
         dag, task1 = create_dummy_dag(
-            dag_id="test_handle_failure_fail_stop",
+            dag_id="test_handle_failure_fail_fast",
             schedule=None,
             start_date=start_date,
             task_id="task1",
             trigger_rule="all_success",
             with_dagrun_type=DagRunType.MANUAL,
             session=session,
-            fail_stop=True,
+            fail_fast=True,
         )
         logical_date = timezone.utcnow()
         triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}

Reply via email to