This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a76af4f22c0 Rename fail stop dag property to fail fast (#45327)
a76af4f22c0 is described below
commit a76af4f22c0f34813ec51f00cd0e2a4909c77cbf
Author: hprassad <[email protected]>
AuthorDate: Tue Jan 21 07:34:41 2025 +0530
Rename fail stop dag property to fail fast (#45327)
* Renamed fail_stop DAG property to fail_fast (#45229)
* made changes to missed occurences (#45229)
* Renamed fail_stop DAG property to fail_fast (#45229)
* Added newsfragments for the PR #45327
* docs(newsfragment): update change type and migraiton rule
* ci(github-actions): fix ci typo
---------
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Amogh Desai <[email protected]>
---
.github/workflows/news-fragment.yml | 4 ++--
airflow/exceptions.py | 12 ++++++------
airflow/models/dag.py | 6 +++---
airflow/models/taskinstance.py | 20 ++++++++++----------
newsfragments/45327.significant.rst | 23 +++++++++++++++++++++++
task_sdk/src/airflow/sdk/definitions/dag.py | 15 +++++++--------
tests/models/test_baseoperator.py | 18 +++++++++---------
tests/models/test_dag.py | 18 +++++++++---------
tests/models/test_mappedoperator.py | 18 +++++++++---------
tests/models/test_taskinstance.py | 6 +++---
10 files changed, 81 insertions(+), 59 deletions(-)
diff --git a/.github/workflows/news-fragment.yml
b/.github/workflows/news-fragment.yml
index 46cb294d7a5..979a1148a6e 100644
--- a/.github/workflows/news-fragment.yml
+++ b/.github/workflows/news-fragment.yml
@@ -61,13 +61,13 @@ jobs:
BASE_REF: ${{ github.base_ref }}
run: >
change_types=(
- 'DAG changes'
+ 'Dag changes'
'Config changes'
'API changes'
'CLI changes'
'Behaviour changes'
'Plugin changes'
- 'Dependency change'
+ 'Dependency changes'
)
news_fragment_content=`git diff origin/${BASE_REF}
newsfragments/*.significant.rst`
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index fd4fbf6758f..89358e76bd0 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -289,23 +289,23 @@ class DagFileExists(AirflowBadRequest):
warnings.warn("DagFileExists is deprecated and will be removed.",
DeprecationWarning, stacklevel=2)
-class FailStopDagInvalidTriggerRule(AirflowException):
- """Raise when a dag has 'fail_stop' enabled yet has a non-default trigger
rule."""
+class FailFastDagInvalidTriggerRule(AirflowException):
+ """Raise when a dag has 'fail_fast' enabled yet has a non-default trigger
rule."""
_allowed_rules = (TriggerRule.ALL_SUCCESS,
TriggerRule.ALL_DONE_SETUP_SUCCESS)
@classmethod
- def check(cls, *, fail_stop: bool, trigger_rule: TriggerRule):
+ def check(cls, *, fail_fast: bool, trigger_rule: TriggerRule):
"""
- Check that fail_stop dag tasks have allowable trigger rules.
+ Check that fail_fast dag tasks have allowable trigger rules.
:meta private:
"""
- if fail_stop and trigger_rule not in cls._allowed_rules:
+ if fail_fast and trigger_rule not in cls._allowed_rules:
raise cls()
def __str__(self) -> str:
- return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS}
trigger rule"
+ return f"A 'fail_fast' dag can only have {TriggerRule.ALL_SUCCESS}
trigger rule"
class DuplicateTaskIdFound(AirflowException):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 770552b1b3b..7da56bca624 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -419,9 +419,9 @@ class DAG(TaskSDKDag, LoggingMixin):
Can be used as an HTTP link (for example the link to your Slack
channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a
``with`` block
- :param fail_stop: Fails currently running tasks when task in DAG fails.
- **Warning**: A fail stop dag can only have tasks with the default
trigger rule ("all_success").
- An exception will be thrown if any task in a fail stop dag has a non
default trigger rule.
+ :param fail_fast: Fails currently running tasks when task in DAG fails.
+ **Warning**: A fail fast dag can only have tasks with the default
trigger rule ("all_success").
+ An exception will be thrown if any task in a fail fast dag has a non
default trigger rule.
:param dag_display_name: The display name of the DAG which appears on the
UI.
"""
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5e0f4001d2d..5a605f40cd5 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -409,10 +409,10 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance,
session: Session):
task = task_instance.task.dag.task_dict[ti.task_id]
if not task.is_teardown:
if ti.state == TaskInstanceState.RUNNING:
- log.info("Forcing task %s to fail due to dag's `fail_stop`
setting", ti.task_id)
+ log.info("Forcing task %s to fail due to dag's `fail_fast`
setting", ti.task_id)
ti.error(session)
else:
- log.info("Setting task %s to SKIPPED due to dag's `fail_stop`
setting.", ti.task_id)
+ log.info("Setting task %s to SKIPPED due to dag's `fail_fast`
setting.", ti.task_id)
ti.set_state(state=TaskInstanceState.SKIPPED, session=session)
else:
log.info("Not skipping teardown task '%s'", ti.task_id)
@@ -1083,7 +1083,7 @@ def _handle_failure(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
- fail_stop: bool = False,
+ fail_fast: bool = False,
) -> None:
"""
Handle Failure for a task instance.
@@ -1106,7 +1106,7 @@ def _handle_failure(
context=context,
force_fail=force_fail,
session=session,
- fail_stop=fail_stop,
+ fail_fast=fail_fast,
)
_log_state(task_instance=task_instance, lead_msg="Immediate failure
requested. " if force_fail else "")
@@ -3065,7 +3065,7 @@ class TaskInstance(Base, LoggingMixin):
force_fail: bool = False,
*,
session: Session,
- fail_stop: bool = False,
+ fail_fast: bool = False,
):
"""
Fetch the context needed to handle a failure.
@@ -3076,7 +3076,7 @@ class TaskInstance(Base, LoggingMixin):
:param context: Jinja2 context
:param force_fail: if True, task does not retry
:param session: SQLAlchemy ORM Session
- :param fail_stop: if True, fail all downstream tasks
+ :param fail_fast: if True, fail all downstream tasks
"""
if error:
if isinstance(error, BaseException):
@@ -3133,7 +3133,7 @@ class TaskInstance(Base, LoggingMixin):
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None
- if task and fail_stop:
+ if task and fail_fast:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.RUNNING:
@@ -3190,9 +3190,9 @@ class TaskInstance(Base, LoggingMixin):
assert self.task
assert self.task.dag
try:
- fail_stop = self.task.dag.fail_stop
+ fail_fast = self.task.dag.fail_fast
except Exception:
- fail_stop = False
+ fail_fast = False
_handle_failure(
task_instance=self,
error=error,
@@ -3200,7 +3200,7 @@ class TaskInstance(Base, LoggingMixin):
test_mode=test_mode,
context=context,
force_fail=force_fail,
- fail_stop=fail_stop,
+ fail_fast=fail_fast,
)
def is_eligible_to_retry(self):
diff --git a/newsfragments/45327.significant.rst
b/newsfragments/45327.significant.rst
new file mode 100644
index 00000000000..db385eb7059
--- /dev/null
+++ b/newsfragments/45327.significant.rst
@@ -0,0 +1,23 @@
+Renamed DAG argument ``fail_stop`` to ``fail_fast`` across the codebase to
align with Airflow 3.0.
+
+
+* Types of change
+
+ * [x] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [ ] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
+
+* Migration rules needed
+
+ * ruff
+
+ * AIR302
+
+ * arguments in ``DAG``
+
+ * [ ] ``fail_stop`` → ``fail_fast``
diff --git a/task_sdk/src/airflow/sdk/definitions/dag.py
b/task_sdk/src/airflow/sdk/definitions/dag.py
index 016de8d3136..f4b71ec9958 100644
--- a/task_sdk/src/airflow/sdk/definitions/dag.py
+++ b/task_sdk/src/airflow/sdk/definitions/dag.py
@@ -47,7 +47,7 @@ from dateutil.relativedelta import relativedelta
from airflow import settings
from airflow.exceptions import (
DuplicateTaskIdFound,
- FailStopDagInvalidTriggerRule,
+ FailFastDagInvalidTriggerRule,
ParamValidationError,
TaskNotFound,
)
@@ -346,7 +346,7 @@ class DAG:
Can be used as an HTTP link (for example the link to your Slack
channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a
``with`` block
- :param fail_stop: Fails currently running tasks when task in DAG fails.
+ :param fail_fast: Fails currently running tasks when task in DAG fails.
**Warning**: A fail stop dag can only have tasks with the default
trigger rule ("all_success").
An exception will be thrown if any task in a fail stop dag has a non
default trigger rule.
:param dag_display_name: The display name of the DAG which appears on the
UI.
@@ -413,7 +413,7 @@ class DAG:
tags: MutableSet[str] = attrs.field(factory=set, converter=_convert_tags)
owner_links: dict[str, str] = attrs.field(factory=dict)
auto_register: bool = attrs.field(default=True, converter=bool)
- fail_stop: bool = attrs.field(default=False, converter=bool)
+ fail_fast: bool = attrs.field(default=False, converter=bool)
dag_display_name: str = attrs.field(
default=attrs.Factory(_default_dag_display_name, takes_self=True),
validator=attrs.validators.instance_of(str),
@@ -497,8 +497,7 @@ class DAG:
elif isinstance(schedule, Collection) and not isinstance(schedule,
str):
if not all(isinstance(x, BaseAsset) for x in schedule):
raise ValueError(
- "All elements in 'schedule' should be either assets, "
- "asset references, or asset aliases"
+ "All elements in 'schedule' should be either assets, asset
references, or asset aliases"
)
return AssetTriggeredTimetable(AssetAll(*schedule))
else:
@@ -928,7 +927,7 @@ class DAG:
# Add task_id to used_group_ids to prevent group_id and task_id
collisions.
self.task_group.used_group_ids.add(task_id)
- FailStopDagInvalidTriggerRule.check(fail_stop=self.fail_stop,
trigger_rule=task.trigger_rule)
+ FailFastDagInvalidTriggerRule.check(fail_fast=self.fail_fast,
trigger_rule=task.trigger_rule)
def add_tasks(self, tasks: Iterable[Operator]) -> None:
"""
@@ -1022,7 +1021,7 @@ DAG._DAG__serialized_fields = frozenset(a.name for a in
attrs.fields(DAG)) - {
"has_on_success_callback",
"has_on_failure_callback",
"auto_register",
- "fail_stop",
+ "fail_fast",
"schedule",
}
@@ -1058,7 +1057,7 @@ if TYPE_CHECKING:
tags: Collection[str] | None = None,
owner_links: dict[str, str] | None = None,
auto_register: bool = True,
- fail_stop: bool = False,
+ fail_fast: bool = False,
dag_display_name: str | None = None,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
diff --git a/tests/models/test_baseoperator.py
b/tests/models/test_baseoperator.py
index bc601099744..0ff8896746a 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -71,24 +71,24 @@ class TestBaseOperator:
def test_trigger_rule_validation(self):
from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE
- fail_stop_dag = DAG(
+ fail_fast_dag = DAG(
dag_id="test_dag_trigger_rule_validation",
schedule=None,
start_date=DEFAULT_DATE,
- fail_stop=True,
+ fail_fast=True,
)
- non_fail_stop_dag = DAG(
+ non_fail_fast_dag = DAG(
dag_id="test_dag_trigger_rule_validation",
schedule=None,
start_date=DEFAULT_DATE,
- fail_stop=False,
+ fail_fast=False,
)
# An operator with default trigger rule and a fail-stop dag should be
allowed
- BaseOperator(task_id="test_valid_trigger_rule", dag=fail_stop_dag,
trigger_rule=DEFAULT_TRIGGER_RULE)
+ BaseOperator(task_id="test_valid_trigger_rule", dag=fail_fast_dag,
trigger_rule=DEFAULT_TRIGGER_RULE)
# An operator with non default trigger rule and a non fail-stop dag
should be allowed
BaseOperator(
- task_id="test_valid_trigger_rule", dag=non_fail_stop_dag,
trigger_rule=TriggerRule.ALWAYS
+ task_id="test_valid_trigger_rule", dag=non_fail_fast_dag,
trigger_rule=TriggerRule.ALWAYS
)
def test_cross_downstream(self):
@@ -454,13 +454,13 @@ def get_states(dr):
@pytest.mark.db_test
-def test_teardown_and_fail_stop(dag_maker):
+def test_teardown_and_fail_fast(dag_maker):
"""
- when fail_stop enabled, teardowns should run according to their setups.
+ when fail_fast enabled, teardowns should run according to their setups.
in this case, the second teardown skips because its setup skips.
"""
- with dag_maker(fail_stop=True) as dag:
+ with dag_maker(fail_fast=True) as dag:
for num in (1, 2):
with TaskGroup(f"tg_{num}"):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 38f2c26c1ff..fb3519b7089 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1428,40 +1428,40 @@ class TestDag:
def test_dag_add_task_checks_trigger_rule(self):
# A non fail stop dag should allow any trigger rule
- from airflow.exceptions import FailStopDagInvalidTriggerRule
+ from airflow.exceptions import FailFastDagInvalidTriggerRule
from airflow.utils.trigger_rule import TriggerRule
task_with_non_default_trigger_rule = EmptyOperator(
task_id="task_with_non_default_trigger_rule",
trigger_rule=TriggerRule.ALWAYS
)
- non_fail_stop_dag = DAG(
+ non_fail_fast_dag = DAG(
dag_id="test_dag_add_task_checks_trigger_rule",
schedule=None,
start_date=DEFAULT_DATE,
- fail_stop=False,
+ fail_fast=False,
)
- non_fail_stop_dag.add_task(task_with_non_default_trigger_rule)
+ non_fail_fast_dag.add_task(task_with_non_default_trigger_rule)
# a fail stop dag should allow default trigger rule
from airflow.models.abstractoperator import DEFAULT_TRIGGER_RULE
- fail_stop_dag = DAG(
+ fail_fast_dag = DAG(
dag_id="test_dag_add_task_checks_trigger_rule",
schedule=None,
start_date=DEFAULT_DATE,
- fail_stop=True,
+ fail_fast=True,
)
task_with_default_trigger_rule = EmptyOperator(
task_id="task_with_default_trigger_rule",
trigger_rule=DEFAULT_TRIGGER_RULE
)
- fail_stop_dag.add_task(task_with_default_trigger_rule)
+ fail_fast_dag.add_task(task_with_default_trigger_rule)
# a fail stop dag should not allow a non-default trigger rule
task_with_non_default_trigger_rule = EmptyOperator(
task_id="task_with_non_default_trigger_rule",
trigger_rule=TriggerRule.ALWAYS
)
- with pytest.raises(FailStopDagInvalidTriggerRule):
- fail_stop_dag.add_task(task_with_non_default_trigger_rule)
+ with pytest.raises(FailFastDagInvalidTriggerRule):
+ fail_fast_dag.add_task(task_with_non_default_trigger_rule)
def test_dag_add_task_sets_default_task_group(self):
dag = DAG(dag_id="test_dag_add_task_sets_default_task_group",
schedule=None, start_date=DEFAULT_DATE)
diff --git a/tests/models/test_mappedoperator.py
b/tests/models/test_mappedoperator.py
index e683a00b963..ff107d8204d 100644
--- a/tests/models/test_mappedoperator.py
+++ b/tests/models/test_mappedoperator.py
@@ -1580,12 +1580,12 @@ class TestMappedSetupTeardown:
}
assert states == expected
- def test_one_to_many_with_teardown_and_fail_stop(self, dag_maker):
+ def test_one_to_many_with_teardown_and_fail_fast(self, dag_maker):
"""
- With fail_stop enabled, the teardown for an already-completed setup
+ With fail_fast enabled, the teardown for an already-completed setup
should not be skipped.
"""
- with dag_maker(fail_stop=True) as dag:
+ with dag_maker(fail_fast=True) as dag:
@task
def my_setup():
@@ -1616,12 +1616,12 @@ class TestMappedSetupTeardown:
}
assert states == expected
- def test_one_to_many_with_teardown_and_fail_stop_more_tasks(self,
dag_maker):
+ def test_one_to_many_with_teardown_and_fail_fast_more_tasks(self,
dag_maker):
"""
- when fail_stop enabled, teardowns should run according to their setups.
+ when fail_fast enabled, teardowns should run according to their setups.
in this case, the second teardown skips because its setup skips.
"""
- with dag_maker(fail_stop=True) as dag:
+ with dag_maker(fail_fast=True) as dag:
for num in (1, 2):
with TaskGroup(f"tg_{num}"):
@@ -1658,12 +1658,12 @@ class TestMappedSetupTeardown:
}
assert states == expected
- def
test_one_to_many_with_teardown_and_fail_stop_more_tasks_mapped_setup(self,
dag_maker):
+ def
test_one_to_many_with_teardown_and_fail_fast_more_tasks_mapped_setup(self,
dag_maker):
"""
- when fail_stop enabled, teardowns should run according to their setups.
+ when fail_fast enabled, teardowns should run according to their setups.
in this case, the second teardown skips because its setup skips.
"""
- with dag_maker(fail_stop=True) as dag:
+ with dag_maker(fail_fast=True) as dag:
for num in (1, 2):
with TaskGroup(f"tg_{num}"):
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 29506c66761..fca20b4bed0 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -3649,19 +3649,19 @@ class TestTaskInstance:
ti.handle_failure("test ti.task undefined")
@provide_session
- def test_handle_failure_fail_stop(self, create_dummy_dag, session=None):
+ def test_handle_failure_fail_fast(self, create_dummy_dag, session=None):
start_date = timezone.datetime(2016, 6, 1)
clear_db_runs()
dag, task1 = create_dummy_dag(
- dag_id="test_handle_failure_fail_stop",
+ dag_id="test_handle_failure_fail_fast",
schedule=None,
start_date=start_date,
task_id="task1",
trigger_rule="all_success",
with_dagrun_type=DagRunType.MANUAL,
session=session,
- fail_stop=True,
+ fail_fast=True,
)
logical_date = timezone.utcnow()
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if
AIRFLOW_V_3_0_PLUS else {}