This is an automated email from the ASF dual-hosted git repository.
qian pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0e0e887 Introduce RESTARTING state (#16681)
0e0e887 is described below
commit 0e0e88792b04c78c2b0d5114e0078be847270da9
Author: yuqian90 <[email protected]>
AuthorDate: Sat Jul 31 18:32:45 2021 +0800
Introduce RESTARTING state (#16681)
closes: #16680
This PR makes sure that when a user clears a running task, the task does
not fail. Instead it is killed and retried gracefully.
This is done by introducing a new State called RESTARTING. As the name
suggests, a TaskInstance is set to this state when it's cleared while running.
Most of the places handles RESTARTING the same way SHUTDOWN is handled, except
in TaskInstance.is_eligible_to_retry, where it is always be treated as eligible
for retry.
---
UPDATING.md | 4 ++++
airflow/jobs/base_job.py | 2 +-
airflow/models/dagrun.py | 2 +-
airflow/models/taskinstance.py | 11 +++++++++--
airflow/utils/state.py | 11 ++++++++++-
airflow/www/static/css/graph.css | 4 ++++
airflow/www/static/css/tree.css | 4 ++++
airflow/www/static/js/graph.js | 2 +-
docs/apache-airflow/concepts/tasks.rst | 2 ++
docs/apache-airflow/img/task_lifecycle_diagram.png | Bin 135225 -> 58659 bytes
tests/models/test_dag.py | 2 +-
tests/www/views/test_views_base.py | 2 +-
12 files changed, 38 insertions(+), 8 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 4777b0e..b18dff5 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -133,6 +133,10 @@ If you are using DAGs Details API endpoint, use
`max_active_tasks` instead of `c
When marking a task success/failed in Graph View, its downstream tasks that
are in failed/upstream_failed state are automatically cleared.
+### Clearing a running task sets its state to `RESTARTING`
+
+Previously, clearing a running task sets its state to `SHUTDOWN`. The task
gets killed and goes into `FAILED` state. After
[#16681](https://github.com/apache/airflow/pull/16681), clearing a running task
sets its state to `RESTARTING`. The task is eligible for retry without going
into `FAILED` state.
+
### Remove `TaskInstance.log_filepath` attribute
This method returned incorrect values for a long time, because it did not take
into account the different
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 18893f2..745f248 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -202,7 +202,7 @@ class BaseJob(Base, LoggingMixin):
session.merge(self)
previous_heartbeat = self.latest_heartbeat
- if self.state == State.SHUTDOWN:
+ if self.state in State.terminating_states:
self.kill()
# Figure out how long to sleep for
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index e2057d5..1fdd26c 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -477,7 +477,7 @@ class DagRun(Base, LoggingMixin):
schedulable_tis: List[TI] = []
changed_tis = False
- tis = list(self.get_task_instances(session=session,
state=State.task_states + (State.SHUTDOWN,)))
+ tis = list(self.get_task_instances(session=session,
state=State.task_states))
self.log.debug("number of tis tasks for %s: %s task(s)", self,
len(tis))
for ti in tis:
try:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6fb437b..11fa590 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -158,7 +158,9 @@ def clear_task_instances(
for ti in tis:
if ti.state == State.RUNNING:
if ti.job_id:
- ti.state = State.SHUTDOWN
+ # If a task is cleared when running, set its state to
RESTARTING so that
+ # the task is terminated and becomes eligible for retry.
+ ti.state = State.RESTARTING
job_ids.append(ti.job_id)
else:
task_id = ti.task_id
@@ -211,7 +213,7 @@ def clear_task_instances(
from airflow.jobs.base_job import BaseJob
for job in
session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():
- job.state = State.SHUTDOWN
+ job.state = State.RESTARTING
if activate_dag_runs is not None:
warnings.warn(
@@ -1519,6 +1521,11 @@ class TaskInstance(Base, LoggingMixin):
def is_eligible_to_retry(self):
"""Is task instance is eligible for retry"""
+ if self.state == State.RESTARTING:
+ # If a task is cleared when running, it goes into RESTARTING state
and is always
+ # eligible for retry
+ return True
+
return self.task.retries and self.try_number <= self.max_tries
@provide_session
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index e95b409..f408c94 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -39,7 +39,8 @@ class TaskInstanceState(str, Enum):
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
SUCCESS = "success" # Task completed
- SHUTDOWN = "shutdown" # External request to shut down
+ SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed
when running)
+ RESTARTING = "restarting" # External request to restart (e.g. cleared
when running)
FAILED = "failed" # Task errored out
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
@@ -84,6 +85,7 @@ class State:
SCHEDULED = TaskInstanceState.SCHEDULED
QUEUED = TaskInstanceState.QUEUED
SHUTDOWN = TaskInstanceState.SHUTDOWN
+ RESTARTING = TaskInstanceState.RESTARTING
UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
@@ -105,6 +107,7 @@ class State:
TaskInstanceState.RUNNING: 'lime',
TaskInstanceState.SUCCESS: 'green',
TaskInstanceState.SHUTDOWN: 'blue',
+ TaskInstanceState.RESTARTING: 'violet',
TaskInstanceState.FAILED: 'red',
TaskInstanceState.UP_FOR_RETRY: 'gold',
TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
@@ -159,6 +162,7 @@ class State:
TaskInstanceState.RUNNING,
TaskInstanceState.SENSING,
TaskInstanceState.SHUTDOWN,
+ TaskInstanceState.RESTARTING,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
]
@@ -182,6 +186,11 @@ class State:
A list of states indicating that a task or dag is a success state.
"""
+ terminating_states = frozenset([TaskInstanceState.SHUTDOWN,
TaskInstanceState.RESTARTING])
+ """
+ A list of states indicating that a task has been terminated.
+ """
+
class PokeState:
"""Static class with poke states constants used in smart operator."""
diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css
index f4c7b94..ce76df7 100644
--- a/airflow/www/static/css/graph.css
+++ b/airflow/www/static/css/graph.css
@@ -148,6 +148,10 @@ g.node.shutdown rect {
stroke: blue;
}
+g.node.restarting rect {
+ stroke: violet;
+}
+
g.node.upstream_failed rect {
stroke: orange;
}
diff --git a/airflow/www/static/css/tree.css b/airflow/www/static/css/tree.css
index 05b2c81..c17cf0a 100644
--- a/airflow/www/static/css/tree.css
+++ b/airflow/www/static/css/tree.css
@@ -67,6 +67,10 @@ rect.shutdown {
fill: blue;
}
+rect.restarting {
+ fill: violet;
+}
+
rect.upstream_failed {
fill: orange;
}
diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index d3d913b..5b1ffee 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -536,7 +536,7 @@ function getNodeState(nodeId, tis) {
// In this order, if any of these states appeared in childrenStates, return
it as
// the group state.
const priority = ['failed', 'upstream_failed', 'up_for_retry',
'up_for_reschedule',
- 'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'removed',
+ 'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'restarting',
'removed',
'no_status', 'success', 'skipped'];
return priority.find((state) => childrenStates.has(state)) || 'no_status';
diff --git a/docs/apache-airflow/concepts/tasks.rst
b/docs/apache-airflow/concepts/tasks.rst
index d4a6608..f481bab 100644
--- a/docs/apache-airflow/concepts/tasks.rst
+++ b/docs/apache-airflow/concepts/tasks.rst
@@ -70,6 +70,8 @@ The possible states for a Task Instance are:
* ``queued``: The task has been assigned to an Executor and is awaiting a
worker
* ``running``: The task is running on a worker (or on a local/synchronous
executor)
* ``success``: The task finished running without errors
+* ``shutdown``: The task was externally requested to shut down when it was
running
+* ``restarting``: The task was externally requested to restart when it was
running
* ``failed``: The task had an error during execution and failed to run
* ``skipped``: The task was skipped due to branching, LatestOnly, or similar.
* ``upstream_failed``: An upstream task failed and the :ref:`Trigger Rule
<concepts:trigger-rules>` says we needed it
diff --git a/docs/apache-airflow/img/task_lifecycle_diagram.png
b/docs/apache-airflow/img/task_lifecycle_diagram.png
index ad0bd9e..810942f 100644
Binary files a/docs/apache-airflow/img/task_lifecycle_diagram.png and
b/docs/apache-airflow/img/task_lifecycle_diagram.png differ
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 7cbda66..726c07b 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1432,7 +1432,7 @@ class TestDag(unittest.TestCase):
@parameterized.expand(
[(state, State.NONE) for state in State.task_states if state !=
State.RUNNING]
- + [(State.RUNNING, State.SHUTDOWN)]
+ + [(State.RUNNING, State.RESTARTING)]
) # type: ignore
def test_clear_dag(self, ti_state_begin, ti_state_end: Optional[str]):
dag_id = 'test_clear_dag'
diff --git a/tests/www/views/test_views_base.py
b/tests/www/views/test_views_base.py
index 84a1ea6..537f661 100644
--- a/tests/www/views/test_views_base.py
+++ b/tests/www/views/test_views_base.py
@@ -58,7 +58,7 @@ def test_home(capture_templates, admin_client):
val_state_color_mapping = (
'const STATE_COLOR = {"failed": "red", '
'"null": "lightblue", "queued": "gray", '
- '"removed": "lightgrey", "running": "lime", '
+ '"removed": "lightgrey", "restarting": "violet", "running":
"lime", '
'"scheduled": "tan", "sensing": "lightseagreen", '
'"shutdown": "blue", "skipped": "pink", '
'"success": "green", "up_for_reschedule": "turquoise", '