This is an automated email from the ASF dual-hosted git repository.

qian pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e0e887  Introduce RESTARTING state (#16681)
0e0e887 is described below

commit 0e0e88792b04c78c2b0d5114e0078be847270da9
Author: yuqian90 <[email protected]>
AuthorDate: Sat Jul 31 18:32:45 2021 +0800

    Introduce RESTARTING state (#16681)
    
    closes: #16680
    
    This PR makes sure that when a user clears a running task, the task does 
not fail. Instead it is killed and retried gracefully.
    
    This is done by introducing a new State called RESTARTING. As the name 
suggests, a TaskInstance is set to this state when it's cleared while running. 
Most of the places handles RESTARTING the same way SHUTDOWN is handled, except 
in TaskInstance.is_eligible_to_retry, where it is always be treated as eligible 
for retry.
---
 UPDATING.md                                        |   4 ++++
 airflow/jobs/base_job.py                           |   2 +-
 airflow/models/dagrun.py                           |   2 +-
 airflow/models/taskinstance.py                     |  11 +++++++++--
 airflow/utils/state.py                             |  11 ++++++++++-
 airflow/www/static/css/graph.css                   |   4 ++++
 airflow/www/static/css/tree.css                    |   4 ++++
 airflow/www/static/js/graph.js                     |   2 +-
 docs/apache-airflow/concepts/tasks.rst             |   2 ++
 docs/apache-airflow/img/task_lifecycle_diagram.png | Bin 135225 -> 58659 bytes
 tests/models/test_dag.py                           |   2 +-
 tests/www/views/test_views_base.py                 |   2 +-
 12 files changed, 38 insertions(+), 8 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 4777b0e..b18dff5 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -133,6 +133,10 @@ If you are using DAGs Details API endpoint, use 
`max_active_tasks` instead of `c
 
 When marking a task success/failed in Graph View, its downstream tasks that 
are in failed/upstream_failed state are automatically cleared.
 
+### Clearing a running task sets its state to `RESTARTING`
+
+Previously, clearing a running task sets its state to `SHUTDOWN`. The task 
gets killed and goes into `FAILED` state. After 
[#16681](https://github.com/apache/airflow/pull/16681), clearing a running task 
sets its state to `RESTARTING`. The task is eligible for retry without going 
into `FAILED` state.
+
 ### Remove `TaskInstance.log_filepath` attribute
 
 This method returned incorrect values for a long time, because it did not take 
into account the different
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 18893f2..745f248 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -202,7 +202,7 @@ class BaseJob(Base, LoggingMixin):
                 session.merge(self)
                 previous_heartbeat = self.latest_heartbeat
 
-            if self.state == State.SHUTDOWN:
+            if self.state in State.terminating_states:
                 self.kill()
 
             # Figure out how long to sleep for
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index e2057d5..1fdd26c 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -477,7 +477,7 @@ class DagRun(Base, LoggingMixin):
         schedulable_tis: List[TI] = []
         changed_tis = False
 
-        tis = list(self.get_task_instances(session=session, 
state=State.task_states + (State.SHUTDOWN,)))
+        tis = list(self.get_task_instances(session=session, 
state=State.task_states))
         self.log.debug("number of tis tasks for %s: %s task(s)", self, 
len(tis))
         for ti in tis:
             try:
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6fb437b..11fa590 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -158,7 +158,9 @@ def clear_task_instances(
     for ti in tis:
         if ti.state == State.RUNNING:
             if ti.job_id:
-                ti.state = State.SHUTDOWN
+                # If a task is cleared when running, set its state to 
RESTARTING so that
+                # the task is terminated and becomes eligible for retry.
+                ti.state = State.RESTARTING
                 job_ids.append(ti.job_id)
         else:
             task_id = ti.task_id
@@ -211,7 +213,7 @@ def clear_task_instances(
         from airflow.jobs.base_job import BaseJob
 
         for job in 
session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():
-            job.state = State.SHUTDOWN
+            job.state = State.RESTARTING
 
     if activate_dag_runs is not None:
         warnings.warn(
@@ -1519,6 +1521,11 @@ class TaskInstance(Base, LoggingMixin):
 
     def is_eligible_to_retry(self):
         """Is task instance is eligible for retry"""
+        if self.state == State.RESTARTING:
+            # If a task is cleared when running, it goes into RESTARTING state 
and is always
+            # eligible for retry
+            return True
+
         return self.task.retries and self.try_number <= self.max_tries
 
     @provide_session
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index e95b409..f408c94 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -39,7 +39,8 @@ class TaskInstanceState(str, Enum):
     QUEUED = "queued"  # Executor has enqueued the task
     RUNNING = "running"  # Task is executing
     SUCCESS = "success"  # Task completed
-    SHUTDOWN = "shutdown"  # External request to shut down
+    SHUTDOWN = "shutdown"  # External request to shut down (e.g. marked failed 
when running)
+    RESTARTING = "restarting"  # External request to restart (e.g. cleared 
when running)
     FAILED = "failed"  # Task errored out
     UP_FOR_RETRY = "up_for_retry"  # Task failed but has retries left
     UP_FOR_RESCHEDULE = "up_for_reschedule"  # A waiting `reschedule` sensor
@@ -84,6 +85,7 @@ class State:
     SCHEDULED = TaskInstanceState.SCHEDULED
     QUEUED = TaskInstanceState.QUEUED
     SHUTDOWN = TaskInstanceState.SHUTDOWN
+    RESTARTING = TaskInstanceState.RESTARTING
     UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
     UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
     UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
@@ -105,6 +107,7 @@ class State:
         TaskInstanceState.RUNNING: 'lime',
         TaskInstanceState.SUCCESS: 'green',
         TaskInstanceState.SHUTDOWN: 'blue',
+        TaskInstanceState.RESTARTING: 'violet',
         TaskInstanceState.FAILED: 'red',
         TaskInstanceState.UP_FOR_RETRY: 'gold',
         TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
@@ -159,6 +162,7 @@ class State:
             TaskInstanceState.RUNNING,
             TaskInstanceState.SENSING,
             TaskInstanceState.SHUTDOWN,
+            TaskInstanceState.RESTARTING,
             TaskInstanceState.UP_FOR_RETRY,
             TaskInstanceState.UP_FOR_RESCHEDULE,
         ]
@@ -182,6 +186,11 @@ class State:
     A list of states indicating that a task or dag is a success state.
     """
 
+    terminating_states = frozenset([TaskInstanceState.SHUTDOWN, 
TaskInstanceState.RESTARTING])
+    """
+    A list of states indicating that a task has been terminated.
+    """
+
 
 class PokeState:
     """Static class with poke states constants used in smart operator."""
diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css
index f4c7b94..ce76df7 100644
--- a/airflow/www/static/css/graph.css
+++ b/airflow/www/static/css/graph.css
@@ -148,6 +148,10 @@ g.node.shutdown rect {
   stroke: blue;
 }
 
+g.node.restarting rect {
+  stroke: violet;
+}
+
 g.node.upstream_failed rect {
   stroke: orange;
 }
diff --git a/airflow/www/static/css/tree.css b/airflow/www/static/css/tree.css
index 05b2c81..c17cf0a 100644
--- a/airflow/www/static/css/tree.css
+++ b/airflow/www/static/css/tree.css
@@ -67,6 +67,10 @@ rect.shutdown {
   fill: blue;
 }
 
+rect.restarting {
+  fill: violet;
+}
+
 rect.upstream_failed {
   fill: orange;
 }
diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index d3d913b..5b1ffee 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -536,7 +536,7 @@ function getNodeState(nodeId, tis) {
   // In this order, if any of these states appeared in childrenStates, return 
it as
   // the group state.
   const priority = ['failed', 'upstream_failed', 'up_for_retry', 
'up_for_reschedule',
-    'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'removed',
+    'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'restarting', 
'removed',
     'no_status', 'success', 'skipped'];
 
   return priority.find((state) => childrenStates.has(state)) || 'no_status';
diff --git a/docs/apache-airflow/concepts/tasks.rst 
b/docs/apache-airflow/concepts/tasks.rst
index d4a6608..f481bab 100644
--- a/docs/apache-airflow/concepts/tasks.rst
+++ b/docs/apache-airflow/concepts/tasks.rst
@@ -70,6 +70,8 @@ The possible states for a Task Instance are:
 * ``queued``: The task has been assigned to an Executor and is awaiting a 
worker
 * ``running``: The task is running on a worker (or on a local/synchronous 
executor)
 * ``success``: The task finished running without errors
+* ``shutdown``: The task was externally requested to shut down when it was 
running
+* ``restarting``: The task was externally requested to restart when it was 
running
 * ``failed``: The task had an error during execution and failed to run
 * ``skipped``: The task was skipped due to branching, LatestOnly, or similar.
 * ``upstream_failed``: An upstream task failed and the :ref:`Trigger Rule 
<concepts:trigger-rules>` says we needed it
diff --git a/docs/apache-airflow/img/task_lifecycle_diagram.png 
b/docs/apache-airflow/img/task_lifecycle_diagram.png
index ad0bd9e..810942f 100644
Binary files a/docs/apache-airflow/img/task_lifecycle_diagram.png and 
b/docs/apache-airflow/img/task_lifecycle_diagram.png differ
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 7cbda66..726c07b 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1432,7 +1432,7 @@ class TestDag(unittest.TestCase):
 
     @parameterized.expand(
         [(state, State.NONE) for state in State.task_states if state != 
State.RUNNING]
-        + [(State.RUNNING, State.SHUTDOWN)]
+        + [(State.RUNNING, State.RESTARTING)]
     )  # type: ignore
     def test_clear_dag(self, ti_state_begin, ti_state_end: Optional[str]):
         dag_id = 'test_clear_dag'
diff --git a/tests/www/views/test_views_base.py 
b/tests/www/views/test_views_base.py
index 84a1ea6..537f661 100644
--- a/tests/www/views/test_views_base.py
+++ b/tests/www/views/test_views_base.py
@@ -58,7 +58,7 @@ def test_home(capture_templates, admin_client):
         val_state_color_mapping = (
             'const STATE_COLOR = {"failed": "red", '
             '"null": "lightblue", "queued": "gray", '
-            '"removed": "lightgrey", "running": "lime", '
+            '"removed": "lightgrey", "restarting": "violet", "running": 
"lime", '
             '"scheduled": "tan", "sensing": "lightseagreen", '
             '"shutdown": "blue", "skipped": "pink", '
             '"success": "green", "up_for_reschedule": "turquoise", '

Reply via email to