This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 9a1b5378066 [v2-10-test] Ensure teardown tasks are executed when DAG
run is set to failed (#45530) (#45581)
9a1b5378066 is described below
commit 9a1b537806641296a67d51297d6f7711f7ec8412
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Jan 11 20:27:50 2025 +0100
[v2-10-test] Ensure teardown tasks are executed when DAG run is set to
failed (#45530) (#45581)
* [v2-10-test] Ensure teardown tasks are executed when DAG run is set to
failed (#45530)
* Ensure teardown tasks are executed when DAG run is set to failed
* Also handle the case of setting DAG to success
* Add some documentation to behavior changes
* Add some documentation to behavior changes
(cherry picked from commit 1e8977a2ea24e989c6c57ee3cb8e7b6bc4cf6c56)
Co-authored-by: Jens Scheffler <[email protected]>
* Remove type hints only working in Airflow 3
---------
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
---
airflow/api/common/mark_tasks.py | 41 +++++++------
docs/apache-airflow/howto/setup-and-teardown.rst | 8 ++-
newsfragments/45530.significant.rst | 12 ++++
tests/api/common/test_mark_tasks.py | 74 ++++++++++++++++++++++++
4 files changed, 117 insertions(+), 18 deletions(-)
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 58ca737a571..6b656e85a69 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -411,15 +411,18 @@ def set_dag_run_state_to_success(
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")
+
+ # Mark all task instances of the dag run to success - except for teardown
as they need to complete work.
+ normal_tasks = [task for task in dag.tasks if not task.is_teardown]
+
# Mark the dag run to success.
- if commit:
+ if commit and len(normal_tasks) == len(dag.tasks):
_set_dag_run_state(dag.dag_id, run_id, DagRunState.SUCCESS, session)
- # Mark all task instances of the dag run to success.
- for task in dag.tasks:
+ for task in normal_tasks:
task.dag = dag
return set_state(
- tasks=dag.tasks,
+ tasks=normal_tasks,
run_id=run_id,
state=TaskInstanceState.SUCCESS,
commit=commit,
@@ -466,10 +469,6 @@ def set_dag_run_state_to_failed(
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")
- # Mark the dag run to failed.
- if commit:
- _set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session)
-
running_states = (
TaskInstanceState.RUNNING,
TaskInstanceState.DEFERRED,
@@ -478,25 +477,26 @@ def set_dag_run_state_to_failed(
# Mark only RUNNING task instances.
task_ids = [task.task_id for task in dag.tasks]
- tis = session.scalars(
+ running_tis: list[TaskInstance] = session.scalars(
select(TaskInstance).where(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id == run_id,
TaskInstance.task_id.in_(task_ids),
TaskInstance.state.in_(running_states),
)
- )
+ ).all()
- task_ids_of_running_tis = [task_instance.task_id for task_instance in tis]
+ # Do not kill teardown tasks
+ task_ids_of_running_tis = [ti.task_id for ti in running_tis if not
dag.task_dict[ti.task_id].is_teardown]
- tasks = []
+ running_tasks = []
for task in dag.tasks:
if task.task_id in task_ids_of_running_tis:
task.dag = dag
- tasks.append(task)
+ running_tasks.append(task)
# Mark non-finished tasks as SKIPPED.
- tis = session.scalars(
+ pending_tis: list[TaskInstance] = session.scalars(
select(TaskInstance).filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id == run_id,
@@ -510,12 +510,19 @@ def set_dag_run_state_to_failed(
)
).all()
+ # Do not skip teardown tasks
+ pending_normal_tis = [ti for ti in pending_tis if not
dag.task_dict[ti.task_id].is_teardown]
+
if commit:
- for ti in tis:
+ for ti in pending_normal_tis:
ti.set_state(TaskInstanceState.SKIPPED)
- return tis + set_state(
- tasks=tasks,
+ # Mark the dag run to failed if there is no pending teardown (else
this would not be scheduled later).
+ if not any(dag.task_dict[ti.task_id].is_teardown for ti in
(running_tis + pending_tis)):
+ _set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session)
+
+ return pending_normal_tis + set_state(
+ tasks=running_tasks,
run_id=run_id,
state=TaskInstanceState.FAILED,
commit=commit,
diff --git a/docs/apache-airflow/howto/setup-and-teardown.rst
b/docs/apache-airflow/howto/setup-and-teardown.rst
index 7afb3c4a350..c802c8bedaf 100644
--- a/docs/apache-airflow/howto/setup-and-teardown.rst
+++ b/docs/apache-airflow/howto/setup-and-teardown.rst
@@ -24,8 +24,9 @@ Key features of setup and teardown tasks:
* If you clear a task, its setups and teardowns will be cleared.
* By default, teardown tasks are ignored for the purpose of evaluating dag
run state.
- * A teardown task will run if its setup was successful, even if its work
tasks failed.
+ * A teardown task will run if its setup was successful, even if its work
tasks failed. But it will skip if the setup was skipped.
* Teardown tasks are ignored when setting dependencies against task groups.
+ * Teardown will also be carried out if the DAG run is manually set to
"failed" or "success" to ensure resources will be cleaned-up.
How setup and teardown works
""""""""""""""""""""""""""""
@@ -231,3 +232,8 @@ Trigger rule behavior for teardowns
"""""""""""""""""""""""""""""""""""
Teardowns use a (non-configurable) trigger rule called ALL_DONE_SETUP_SUCCESS.
With this rule, as long as all upstreams are done and at least one directly
connected setup is successful, the teardown will run. If all of a teardown's
setups were skipped or failed, those states will propagate to the teardown.
+
+Side-effect on manual DAG state changes
+"""""""""""""""""""""""""""""""""""""""
+
+As teardown tasks are often used to clean-up resources they need to run also
if the DAG is manually terminated. For the purpose of early termination a user
can manually mark the DAG run as "success" or "failed" which kills all tasks
before completion. If the DAG contains teardown tasks, they will still be
executed. Therefore as a side effect allowing teardown tasks to be scheduled, a
DAG will not be immediately set to a terminal state if the user requests so.
diff --git a/newsfragments/45530.significant.rst
b/newsfragments/45530.significant.rst
new file mode 100644
index 00000000000..7e2ae8e8ac6
--- /dev/null
+++ b/newsfragments/45530.significant.rst
@@ -0,0 +1,12 @@
+Ensure teardown tasks are executed when DAG run is set to failed
+
+Previously when a DAG run was manually set to "failed" or to "success" state
the terminal state was set to all tasks.
+But this was a gap for cases when setup- and teardown tasks were defined: If
teardown was used to clean-up infrastructure
+or other resources, they were also skipped and thus resources could stay
allocated.
+
+As of now when setup tasks had been executed before and the DAG is manually
set to "failed" or "success" then teardown
+tasks are executed. Teardown tasks are skipped if the setup was also skipped.
+
+As a side effect this means if the DAG contains teardown tasks, then the
manual marking of DAG as "failed" or "success"
+will need to keep the DAG in running state to ensure that teardown tasks will
be scheduled. They would not be scheduled
+if the DAG is diorectly set to "failed" or "success".
diff --git a/tests/api/common/test_mark_tasks.py
b/tests/api/common/test_mark_tasks.py
new file mode 100644
index 00000000000..0cf58ee74a6
--- /dev/null
+++ b/tests/api/common/test_mark_tasks.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pytest
+
+from airflow.api.common.mark_tasks import set_dag_run_state_to_failed,
set_dag_run_state_to_success
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstance import TaskInstance
+
+pytestmark = pytest.mark.db_test
+
+
+def test_set_dag_run_state_to_failed(dag_maker):
+ with dag_maker("TEST_DAG_1"):
+ with EmptyOperator(task_id="teardown").as_teardown():
+ EmptyOperator(task_id="running")
+ EmptyOperator(task_id="pending")
+ dr = dag_maker.create_dagrun()
+ for ti in dr.get_task_instances():
+ if ti.task_id == "running":
+ ti.set_state(TaskInstanceState.RUNNING)
+ dag_maker.session.flush()
+ assert dr.dag
+
+ updated_tis: list[TaskInstance] = set_dag_run_state_to_failed(
+ dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session
+ )
+ assert len(updated_tis) == 2
+ task_dict = {ti.task_id: ti for ti in updated_tis}
+ assert task_dict["running"].state == TaskInstanceState.FAILED
+ assert task_dict["pending"].state == TaskInstanceState.SKIPPED
+ assert "teardown" not in task_dict
+
+
+def test_set_dag_run_state_to_success(dag_maker):
+ with dag_maker("TEST_DAG_1"):
+ with EmptyOperator(task_id="teardown").as_teardown():
+ EmptyOperator(task_id="running")
+ EmptyOperator(task_id="pending")
+ dr = dag_maker.create_dagrun()
+ for ti in dr.get_task_instances():
+ if ti.task_id == "running":
+ ti.set_state(TaskInstanceState.RUNNING)
+ dag_maker.session.flush()
+ assert dr.dag
+
+ updated_tis: list[TaskInstance] = set_dag_run_state_to_success(
+ dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session
+ )
+ assert len(updated_tis) == 2
+ task_dict = {ti.task_id: ti for ti in updated_tis}
+ assert task_dict["running"].state == TaskInstanceState.SUCCESS
+ assert task_dict["pending"].state == TaskInstanceState.SUCCESS
+ assert "teardown" not in task_dict