This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8f963850be85aecd530b281a034468e375661c4c Author: Andrew Godwin <[email protected]> AuthorDate: Tue Jul 6 17:36:01 2021 -0600 Add State types for tasks and DAGs (#15285) This adds TaskState and DagState enum types that contain all possible states, makes all other core state constants derive their values from them, and adds a couple of initial type hints that use the new enums (with the plan being that we can add signficantly more later). closes: #9387 (cherry picked from commit 2b7c59619b7dd6fd5031745ade7756466456f803) --- airflow/models/dagrun.py | 6 +- airflow/typing_compat.py | 2 +- airflow/utils/state.py | 169 +++++++++++++++++++++++++++++------------------ 3 files changed, 108 insertions(+), 69 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 66be9a3..6f47077 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -36,7 +36,7 @@ from airflow.utils import callback_requests, timezone from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks -from airflow.utils.state import DagRunState, State +from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.types import DagRunType if TYPE_CHECKING: @@ -288,7 +288,9 @@ class DagRun(Base, LoggingMixin): return f"{run_type}__{execution_date.isoformat()}" @provide_session - def get_task_instances(self, state=None, session=None) -> Iterable[TI]: + def get_task_instances( + self, state: Optional[Iterable[TaskInstanceState]] = None, session=None + ) -> Iterable[TI]: """Returns the task instances for this dag run""" tis = session.query(TI).filter( TI.dag_id == self.dag_id, diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py index 0f3db75..26237c2 100644 --- a/airflow/typing_compat.py +++ b/airflow/typing_compat.py @@ -22,7 +22,7 @@ codebase easier. """ try: - # Protocol and TypedDict are only added to typing module starting from + # Literal, Protocol and TypedDict are only added to typing module starting from # python 3.8 we can safely remove this shim import after Airflow drops # support for <3.8 from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 5ffcbd7..35dee81 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -16,70 +16,103 @@ # specific language governing permissions and limitations # under the License. +from enum import Enum +from typing import Dict, FrozenSet, Tuple + from airflow.settings import STATE_COLORS +from airflow.utils.types import Optional -class State: +class TaskInstanceState(str, Enum): """ - Static class with task instance states constants and color method to - avoid hardcoding. + Enum that represents all possible states that a Task Instance can be in. + + Note that None is also allowed, so always use this in a type hint with Optional. """ - # scheduler - NONE = None # type: None - REMOVED = "removed" - SCHEDULED = "scheduled" + # Set by the scheduler + # None - Task is created but should not run yet + REMOVED = "removed" # Task vanished from DAG before it ran + SCHEDULED = "scheduled" # Task should run and will be handed to executor soon + + # Set by the task instance itself + QUEUED = "queued" # Executor has enqueued the task + RUNNING = "running" # Task is executing + SUCCESS = "success" # Task completed + SHUTDOWN = "shutdown" # External request to shut down + FAILED = "failed" # Task errored out + UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left + UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor + UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed + SKIPPED = "skipped" # Skipped by branching or some other mechanism + SENSING = "sensing" # Smart sensor offloaded to the sensor DAG - # set by the executor (t.b.d.) - # LAUNCHED = "launched" + def __str__(self) -> str: # pylint: disable=invalid-str-returned + return self.value + + +class DagRunState(str, Enum): + """ + Enum that represents all possible states that a DagRun can be in. + + These are "shared" with TaskInstanceState in some parts of the code, + so please ensure that their values always match the ones with the + same name in TaskInstanceState. + """ QUEUED = "queued" RUNNING = "running" SUCCESS = "success" - SHUTDOWN = "shutdown" # External request to shut down FAILED = "failed" - UP_FOR_RETRY = "up_for_retry" - UP_FOR_RESCHEDULE = "up_for_reschedule" - UPSTREAM_FAILED = "upstream_failed" - SKIPPED = "skipped" - SENSING = "sensing" - - task_states = ( - SUCCESS, - RUNNING, - FAILED, - UPSTREAM_FAILED, - SKIPPED, - UP_FOR_RETRY, - UP_FOR_RESCHEDULE, - QUEUED, - NONE, - SCHEDULED, - SENSING, - REMOVED, - ) - dag_states = ( - SUCCESS, - RUNNING, - FAILED, - QUEUED, + +class State: + """ + Static class with task instance state constants and color methods to + avoid hardcoding. + """ + + # Backwards-compat constants for code that does not yet use the enum + # These first three are shared by DagState and TaskState + SUCCESS = TaskInstanceState.SUCCESS + RUNNING = TaskInstanceState.RUNNING + FAILED = TaskInstanceState.FAILED + + # These are TaskState only + NONE = None + REMOVED = TaskInstanceState.REMOVED + SCHEDULED = TaskInstanceState.SCHEDULED + QUEUED = TaskInstanceState.QUEUED + SHUTDOWN = TaskInstanceState.SHUTDOWN + UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY + UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE + UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED + SKIPPED = TaskInstanceState.SKIPPED + SENSING = TaskInstanceState.SENSING + + task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState) + + dag_states: Tuple[DagRunState, ...] = ( + DagRunState.SUCCESS, + DagRunState.RUNNING, + DagRunState.FAILED, + DagRunState.QUEUED, ) - state_color = { - QUEUED: 'gray', - RUNNING: 'lime', - SUCCESS: 'green', - SHUTDOWN: 'blue', - FAILED: 'red', - UP_FOR_RETRY: 'gold', - UP_FOR_RESCHEDULE: 'turquoise', - UPSTREAM_FAILED: 'orange', - SKIPPED: 'pink', - REMOVED: 'lightgrey', - SCHEDULED: 'tan', - NONE: 'lightblue', - SENSING: 'lightseagreen', + state_color: Dict[Optional[TaskInstanceState], str] = { + None: 'lightblue', + TaskInstanceState.QUEUED: 'gray', + TaskInstanceState.RUNNING: 'lime', + TaskInstanceState.SUCCESS: 'green', + TaskInstanceState.SHUTDOWN: 'blue', + TaskInstanceState.FAILED: 'red', + TaskInstanceState.UP_FOR_RETRY: 'gold', + TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise', + TaskInstanceState.UPSTREAM_FAILED: 'orange', + TaskInstanceState.SKIPPED: 'pink', + TaskInstanceState.REMOVED: 'lightgrey', + TaskInstanceState.SCHEDULED: 'tan', + TaskInstanceState.SENSING: 'lightseagreen', } state_color.update(STATE_COLORS) # type: ignore @@ -96,17 +129,17 @@ class State: return 'white' return 'black' - running = frozenset([RUNNING, SENSING]) + running: FrozenSet[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.SENSING]) """ A list of states indicating that a task is being executed. """ - finished = frozenset( + finished: FrozenSet[TaskInstanceState] = frozenset( [ - SUCCESS, - FAILED, - SKIPPED, - UPSTREAM_FAILED, + TaskInstanceState.SUCCESS, + TaskInstanceState.FAILED, + TaskInstanceState.SKIPPED, + TaskInstanceState.UPSTREAM_FAILED, ] ) """ @@ -118,16 +151,16 @@ class State: case, it is no longer running. """ - unfinished = frozenset( + unfinished: FrozenSet[Optional[TaskInstanceState]] = frozenset( [ - NONE, - SCHEDULED, - QUEUED, - RUNNING, - SENSING, - SHUTDOWN, - UP_FOR_RETRY, - UP_FOR_RESCHEDULE, + None, + TaskInstanceState.SCHEDULED, + TaskInstanceState.QUEUED, + TaskInstanceState.RUNNING, + TaskInstanceState.SENSING, + TaskInstanceState.SHUTDOWN, + TaskInstanceState.UP_FOR_RETRY, + TaskInstanceState.UP_FOR_RESCHEDULE, ] ) """ @@ -135,12 +168,16 @@ class State: a run or has not even started. """ - failed_states = frozenset([FAILED, UPSTREAM_FAILED]) + failed_states: FrozenSet[TaskInstanceState] = frozenset( + [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED] + ) """ A list of states indicating that a task or dag is a failed state. """ - success_states = frozenset([SUCCESS, SKIPPED]) + success_states: FrozenSet[TaskInstanceState] = frozenset( + [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED] + ) """ A list of states indicating that a task or dag is a success state. """
