This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a5bf7ff981adcb552aea76775f9f72170a1bbed0 Author: Lipu Fei <[email protected]> AuthorDate: Thu May 4 12:09:36 2023 +0200 Move TaskInstanceKey to a separate file (#31033) closes: #30988 Fixes a circular import of DagRun -> TaskInstance -> Sentry -> KubernetesExexcutor -> kubernetes_helper_functions.py (*) -> TaskInstanceKey. Moving TaskInstanceKey out to a separate file will break the cycle from kubernetes_helper_functions.py -> taskinstances.py Co-authored-by: Lipu Fei <[email protected]> (cherry picked from commit ac46902154c060246dec942f921f7670015e6031) --- airflow/executors/base_executor.py | 3 +- airflow/executors/celery_executor.py | 3 +- airflow/executors/celery_kubernetes_executor.py | 3 +- airflow/executors/dask_executor.py | 2 +- airflow/executors/debug_executor.py | 3 +- airflow/executors/kubernetes_executor.py | 2 +- airflow/executors/local_executor.py | 3 +- airflow/executors/sequential_executor.py | 2 +- airflow/kubernetes/kubernetes_helper_functions.py | 2 +- airflow/models/baseoperator.py | 2 +- airflow/models/taskinstance.py | 37 +-------------- airflow/models/taskinstancekey.py | 54 ++++++++++++++++++++++ airflow/models/xcom.py | 2 +- airflow/operators/trigger_dagrun.py | 2 +- airflow/providers/amazon/aws/links/base_aws.py | 2 +- .../providers/databricks/operators/databricks.py | 2 +- airflow/providers/google/cloud/links/base.py | 2 +- airflow/providers/google/cloud/links/datafusion.py | 2 +- airflow/providers/google/cloud/links/dataproc.py | 2 +- .../providers/google/cloud/operators/bigquery.py | 2 +- .../google/cloud/operators/dataproc_metastore.py | 2 +- .../microsoft/azure/operators/data_factory.py | 2 +- airflow/providers/qubole/operators/qubole.py | 2 +- docs/apache-airflow/public-airflow-interface.rst | 15 ++++++ docs/conf.py | 1 + tests/executors/test_kubernetes_executor.py | 2 +- tests/jobs/test_backfill_job.py | 2 +- tests/models/test_xcom.py | 3 +- tests/test_utils/mock_executor.py | 2 +- 29 files changed, 103 insertions(+), 60 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index e49938c458..b63a7a514b 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -38,7 +38,8 @@ PARALLELISM: int = conf.getint("core", "PARALLELISM") if TYPE_CHECKING: from airflow.callbacks.base_callback_sink import BaseCallbackSink from airflow.callbacks.callback_requests import CallbackRequest - from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + from airflow.models.taskinstance import TaskInstance + from airflow.models.taskinstancekey import TaskInstanceKey # Command to execute - list of strings # the first element is always "airflow". diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 0f72df2a51..1094991288 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -56,7 +56,8 @@ from airflow.utils.timeout import timeout if TYPE_CHECKING: from airflow.executors.base_executor import CommandType, EventBufferValueType, TaskTuple - from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + from airflow.models.taskinstance import TaskInstance + from airflow.models.taskinstancekey import TaskInstanceKey # Task instance that is sent over Celery queues # TaskInstanceKey, Command, queue_name, CallableTask diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py index c7bb8df62a..a4ab1bb804 100644 --- a/airflow/executors/celery_kubernetes_executor.py +++ b/airflow/executors/celery_kubernetes_executor.py @@ -28,7 +28,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType - from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey + from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance + from airflow.models.taskinstancekey import TaskInstanceKey class CeleryKubernetesExecutor(LoggingMixin): diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index c152abe4fa..1998e7c7df 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -36,7 +36,7 @@ from airflow.executors.base_executor import BaseExecutor if TYPE_CHECKING: from airflow.executors.base_executor import CommandType - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey # queue="default" is a special case since this is the base config default queue name, diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index 59b76d6937..ca23b09a67 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -32,7 +32,8 @@ from airflow.executors.base_executor import BaseExecutor from airflow.utils.state import State if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + from airflow.models.taskinstance import TaskInstance + from airflow.models.taskinstancekey import TaskInstanceKey class DebugExecutor(BaseExecutor): diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 80091aa827..72ed57a24e 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -54,7 +54,7 @@ from airflow.utils.state import State, TaskInstanceState if TYPE_CHECKING: from airflow.executors.base_executor import CommandType - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey # TaskInstance key, command, configuration, pod_template_file KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]] diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 6a9bb1a339..df11d49ae4 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -43,7 +43,8 @@ from airflow.utils.state import State if TYPE_CHECKING: from airflow.executors.base_executor import CommandType - from airflow.models.taskinstance import TaskInstanceKey, TaskInstanceStateType + from airflow.models.taskinstance import TaskInstanceStateType + from airflow.models.taskinstancekey import TaskInstanceKey # This is a work to be executed by a worker. # It can Key and Command - but it can also be None, None which is actually a diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index b3da5af080..28f88c6b87 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -32,7 +32,7 @@ from airflow.utils.state import State if TYPE_CHECKING: from airflow.executors.base_executor import CommandType - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey class SequentialExecutor(BaseExecutor): diff --git a/airflow/kubernetes/kubernetes_helper_functions.py b/airflow/kubernetes/kubernetes_helper_functions.py index 7965e405ac..390eb0edb7 100644 --- a/airflow/kubernetes/kubernetes_helper_functions.py +++ b/airflow/kubernetes/kubernetes_helper_functions.py @@ -23,7 +23,7 @@ import string import pendulum from slugify import slugify -from airflow.models.taskinstance import TaskInstanceKey +from airflow.models.taskinstancekey import TaskInstanceKey log = logging.getLogger(__name__) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index cec321b2b3..02ad04765f 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -98,7 +98,7 @@ if TYPE_CHECKING: import jinja2 # Slow import. from airflow.models.dag import DAG - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom_arg import XComArg from airflow.utils.task_group import TaskGroup diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 87c1699402..8ee62043eb 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -32,7 +32,7 @@ from enum import Enum from functools import partial from pathlib import PurePath from types import TracebackType -from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Iterable, NamedTuple, Tuple +from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Iterable, Tuple from urllib.parse import quote import dill @@ -92,6 +92,7 @@ from airflow.models.log import Log from airflow.models.mappedoperator import MappedOperator from airflow.models.param import process_params from airflow.models.taskfail import TaskFail +from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule from airflow.models.xcom import LazyXComAccess, XCom @@ -328,40 +329,6 @@ def _is_mappable_value(value: Any) -> TypeGuard[Collection]: return True -class TaskInstanceKey(NamedTuple): - """Key used to identify task instance.""" - - dag_id: str - task_id: str - run_id: str - try_number: int = 1 - map_index: int = -1 - - @property - def primary(self) -> tuple[str, str, str, int]: - """Return task instance primary key part of the key""" - return self.dag_id, self.task_id, self.run_id, self.map_index - - @property - def reduced(self) -> TaskInstanceKey: - """Remake the key by subtracting 1 from try number to match in memory information""" - return TaskInstanceKey( - self.dag_id, self.task_id, self.run_id, max(1, self.try_number - 1), self.map_index - ) - - def with_try_number(self, try_number: int) -> TaskInstanceKey: - """Returns TaskInstanceKey with provided ``try_number``""" - return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, try_number, self.map_index) - - @property - def key(self) -> TaskInstanceKey: - """For API-compatibly with TaskInstance. - - Returns self - """ - return self - - def _creator_note(val): """Custom creator for the ``note`` association proxy.""" if isinstance(val, str): diff --git a/airflow/models/taskinstancekey.py b/airflow/models/taskinstancekey.py new file mode 100644 index 0000000000..b34aaffd85 --- /dev/null +++ b/airflow/models/taskinstancekey.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import NamedTuple + + +class TaskInstanceKey(NamedTuple): + """Key used to identify task instance.""" + + dag_id: str + task_id: str + run_id: str + try_number: int = 1 + map_index: int = -1 + + @property + def primary(self) -> tuple[str, str, str, int]: + """Return task instance primary key part of the key""" + return self.dag_id, self.task_id, self.run_id, self.map_index + + @property + def reduced(self) -> TaskInstanceKey: + """Remake the key by subtracting 1 from try number to match in memory information""" + return TaskInstanceKey( + self.dag_id, self.task_id, self.run_id, max(1, self.try_number - 1), self.map_index + ) + + def with_try_number(self, try_number: int) -> TaskInstanceKey: + """Returns TaskInstanceKey with provided ``try_number``""" + return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, try_number, self.map_index) + + @property + def key(self) -> TaskInstanceKey: + """For API-compatibly with TaskInstance. + + Returns self + """ + return self diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index 476b170deb..f9e5abc774 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -68,7 +68,7 @@ from airflow.utils.xcom import ( log = logging.getLogger(__name__) if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey class BaseXCom(Base, LoggingMixin): diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index b0115636c3..f3560bffc7 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -46,7 +46,7 @@ XCOM_RUN_ID = "trigger_run_id" if TYPE_CHECKING: from sqlalchemy.orm.session import Session - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey class TriggerDagRunLink(BaseOperatorLink): diff --git a/airflow/providers/amazon/aws/links/base_aws.py b/airflow/providers/amazon/aws/links/base_aws.py index fba2f17e96..82acd337bf 100644 --- a/airflow/providers/amazon/aws/links/base_aws.py +++ b/airflow/providers/amazon/aws/links/base_aws.py @@ -23,7 +23,7 @@ from airflow.models import BaseOperatorLink, XCom if TYPE_CHECKING: from airflow.models import BaseOperator - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py index fb9351818c..6ec44444c4 100644 --- a/airflow/providers/databricks/operators/databricks.py +++ b/airflow/providers/databricks/operators/databricks.py @@ -30,7 +30,7 @@ from airflow.providers.databricks.triggers.databricks import DatabricksExecution from airflow.providers.databricks.utils.databricks import normalise_json_content, validate_trigger_event if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context DEFER_METHOD_NAME = "execute_complete" diff --git a/airflow/providers/google/cloud/links/base.py b/airflow/providers/google/cloud/links/base.py index 755266758e..c898f9890e 100644 --- a/airflow/providers/google/cloud/links/base.py +++ b/airflow/providers/google/cloud/links/base.py @@ -23,7 +23,7 @@ from airflow.models import BaseOperatorLink, XCom if TYPE_CHECKING: from airflow.models import BaseOperator - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey BASE_LINK = "https://console.cloud.google.com" diff --git a/airflow/providers/google/cloud/links/datafusion.py b/airflow/providers/google/cloud/links/datafusion.py index ae1d9d4ada..fae57f97fd 100644 --- a/airflow/providers/google/cloud/links/datafusion.py +++ b/airflow/providers/google/cloud/links/datafusion.py @@ -24,7 +24,7 @@ from airflow.models import BaseOperatorLink, XCom if TYPE_CHECKING: from airflow.models import BaseOperator - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context diff --git a/airflow/providers/google/cloud/links/dataproc.py b/airflow/providers/google/cloud/links/dataproc.py index 573621aa14..4376d41cb0 100644 --- a/airflow/providers/google/cloud/links/dataproc.py +++ b/airflow/providers/google/cloud/links/dataproc.py @@ -25,7 +25,7 @@ from airflow.providers.google.cloud.links.base import BASE_LINK if TYPE_CHECKING: from airflow.models import BaseOperator - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context DATAPROC_BASE_LINK = BASE_LINK + "/dataproc" diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 9de5663420..a103fe0b53 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -52,7 +52,7 @@ from airflow.providers.google.cloud.triggers.bigquery import ( ) if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py b/airflow/providers/google/cloud/operators/dataproc_metastore.py index 3512483434..c7fe6fccf3 100644 --- a/airflow/providers/google/cloud/operators/dataproc_metastore.py +++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py @@ -37,7 +37,7 @@ from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseO from airflow.providers.google.common.links.storage import StorageLink if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context diff --git a/airflow/providers/microsoft/azure/operators/data_factory.py b/airflow/providers/microsoft/azure/operators/data_factory.py index 1ad3723950..e1fdc997ca 100644 --- a/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/airflow/providers/microsoft/azure/operators/data_factory.py @@ -33,7 +33,7 @@ from airflow.providers.microsoft.azure.triggers.data_factory import AzureDataFac from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py index 710387663f..b7547eabb8 100644 --- a/airflow/providers/qubole/operators/qubole.py +++ b/airflow/providers/qubole/operators/qubole.py @@ -33,7 +33,7 @@ from airflow.providers.qubole.hooks.qubole import ( if TYPE_CHECKING: - from airflow.models.taskinstance import TaskInstanceKey + from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.context import Context diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index 6d36de5df1..74d8657244 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -151,6 +151,21 @@ passed to the execute method of the operators via the :class:`~airflow.models.ta _api/airflow/models/taskinstance/index +Task Instance Keys +------------------ + +Task instance keys are unique identifiers of task instances in a DAG (in a DAG Run). A key is a tuple that consists of +``dag_id``, ``task_id``, ``run_id``, ``try_number``, and ``map_index``. The key of a task instance can be retrieved via +:meth:`~airflow.models.taskinstance.TaskInstance.key`. + +.. toctree:: + :includehidden: + :glob: + :maxdepth: 1 + + _api/airflow/models/taskinstancekey/index + + Hooks ----- diff --git a/docs/conf.py b/docs/conf.py index 5e0f81474a..e77fc22f99 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -246,6 +246,7 @@ if PACKAGE_NAME == "apache-airflow": "dagbag.py", "param.py", "taskinstance.py", + "taskinstancekey.py", "variable.py", "xcom.py", } diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 0205e3b10f..0df48e526e 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -32,7 +32,7 @@ from urllib3 import HTTPResponse from airflow import AirflowException from airflow.exceptions import PodReconciliationError -from airflow.models.taskinstance import TaskInstanceKey +from airflow.models.taskinstancekey import TaskInstanceKey from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils import timezone diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 37ed6f8a85..6bd4a1ab55 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -45,7 +45,7 @@ from airflow.listeners.listener import get_listener_manager from airflow.models import DagBag, Pool, TaskInstance as TI from airflow.models.dagrun import DagRun from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import TaskInstanceKey +from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.taskmap import TaskMap from airflow.operators.empty import EmptyOperator from airflow.utils import timezone diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py index 6abdd344ac..a211a9324d 100644 --- a/tests/models/test_xcom.py +++ b/tests/models/test_xcom.py @@ -27,7 +27,8 @@ from sqlalchemy.orm import Session from airflow.configuration import conf from airflow.models.dagrun import DagRun, DagRunType -from airflow.models.taskinstance import TaskInstance, TaskInstanceKey +from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend from airflow.operators.empty import EmptyOperator from airflow.settings import json diff --git a/tests/test_utils/mock_executor.py b/tests/test_utils/mock_executor.py index 43d792dc8d..96d42152c2 100644 --- a/tests/test_utils/mock_executor.py +++ b/tests/test_utils/mock_executor.py @@ -21,7 +21,7 @@ from collections import defaultdict from unittest.mock import MagicMock from airflow.executors.base_executor import BaseExecutor -from airflow.models.taskinstance import TaskInstanceKey +from airflow.models.taskinstancekey import TaskInstanceKey from airflow.utils.session import create_session from airflow.utils.state import State
