This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ac46902154 Move TaskInstanceKey to a separate file (#31033)
ac46902154 is described below
commit ac46902154c060246dec942f921f7670015e6031
Author: Lipu Fei <[email protected]>
AuthorDate: Thu May 4 12:09:36 2023 +0200
Move TaskInstanceKey to a separate file (#31033)
closes: #30988
Fixes a circular import of DagRun -> TaskInstance -> Sentry ->
KubernetesExexcutor -> kubernetes_helper_functions.py (*) ->
TaskInstanceKey.
Moving TaskInstanceKey out to a separate file will break the cycle from
kubernetes_helper_functions.py -> taskinstances.py
Co-authored-by: Lipu Fei <[email protected]>
---
airflow/executors/base_executor.py | 3 +-
airflow/executors/celery_executor.py | 3 +-
airflow/executors/celery_kubernetes_executor.py | 3 +-
airflow/executors/dask_executor.py | 2 +-
airflow/executors/debug_executor.py | 3 +-
airflow/executors/kubernetes_executor.py | 2 +-
airflow/executors/local_executor.py | 3 +-
airflow/executors/sequential_executor.py | 2 +-
airflow/kubernetes/kubernetes_helper_functions.py | 2 +-
airflow/models/baseoperator.py | 2 +-
airflow/models/taskinstance.py | 37 +--------------
airflow/models/taskinstancekey.py | 54 ++++++++++++++++++++++
airflow/models/xcom.py | 2 +-
airflow/operators/trigger_dagrun.py | 2 +-
airflow/providers/amazon/aws/links/base_aws.py | 2 +-
.../providers/databricks/operators/databricks.py | 2 +-
airflow/providers/google/cloud/links/base.py | 2 +-
airflow/providers/google/cloud/links/datafusion.py | 2 +-
airflow/providers/google/cloud/links/dataproc.py | 2 +-
.../providers/google/cloud/operators/bigquery.py | 2 +-
.../google/cloud/operators/dataproc_metastore.py | 2 +-
.../microsoft/azure/operators/data_factory.py | 2 +-
airflow/providers/qubole/operators/qubole.py | 2 +-
docs/apache-airflow/public-airflow-interface.rst | 15 ++++++
docs/conf.py | 1 +
tests/executors/test_kubernetes_executor.py | 2 +-
tests/jobs/test_backfill_job.py | 2 +-
tests/models/test_xcom.py | 3 +-
tests/test_utils/mock_executor.py | 2 +-
29 files changed, 103 insertions(+), 60 deletions(-)
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 4f9ac0c4f6..f78fa1b193 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -38,7 +38,8 @@ PARALLELISM: int = conf.getint("core", "PARALLELISM")
if TYPE_CHECKING:
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
- from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.models.taskinstancekey import TaskInstanceKey
# Command to execute - list of strings
# the first element is always "airflow".
diff --git a/airflow/executors/celery_executor.py
b/airflow/executors/celery_executor.py
index 0f72df2a51..1094991288 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -56,7 +56,8 @@ from airflow.utils.timeout import timeout
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType,
EventBufferValueType, TaskTuple
- from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.models.taskinstancekey import TaskInstanceKey
# Task instance that is sent over Celery queues
# TaskInstanceKey, Command, queue_name, CallableTask
diff --git a/airflow/executors/celery_kubernetes_executor.py
b/airflow/executors/celery_kubernetes_executor.py
index c7bb8df62a..a4ab1bb804 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -28,7 +28,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType,
EventBufferValueType, QueuedTaskInstanceType
- from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance,
TaskInstanceKey
+ from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
+ from airflow.models.taskinstancekey import TaskInstanceKey
class CeleryKubernetesExecutor(LoggingMixin):
diff --git a/airflow/executors/dask_executor.py
b/airflow/executors/dask_executor.py
index c152abe4fa..1998e7c7df 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -36,7 +36,7 @@ from airflow.executors.base_executor import BaseExecutor
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
# queue="default" is a special case since this is the base config default
queue name,
diff --git a/airflow/executors/debug_executor.py
b/airflow/executors/debug_executor.py
index 59b76d6937..ca23b09a67 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -32,7 +32,8 @@ from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import State
if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.models.taskinstancekey import TaskInstanceKey
class DebugExecutor(BaseExecutor):
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 80091aa827..72ed57a24e 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -54,7 +54,7 @@ from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
# TaskInstance key, command, configuration, pod_template_file
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index 6a9bb1a339..df11d49ae4 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -43,7 +43,8 @@ from airflow.utils.state import State
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
- from airflow.models.taskinstance import TaskInstanceKey,
TaskInstanceStateType
+ from airflow.models.taskinstance import TaskInstanceStateType
+ from airflow.models.taskinstancekey import TaskInstanceKey
# This is a work to be executed by a worker.
# It can Key and Command - but it can also be None, None which is actually
a
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index b3da5af080..28f88c6b87 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -32,7 +32,7 @@ from airflow.utils.state import State
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
class SequentialExecutor(BaseExecutor):
diff --git a/airflow/kubernetes/kubernetes_helper_functions.py
b/airflow/kubernetes/kubernetes_helper_functions.py
index 7965e405ac..390eb0edb7 100644
--- a/airflow/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/kubernetes/kubernetes_helper_functions.py
@@ -23,7 +23,7 @@ import string
import pendulum
from slugify import slugify
-from airflow.models.taskinstance import TaskInstanceKey
+from airflow.models.taskinstancekey import TaskInstanceKey
log = logging.getLogger(__name__)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 19e1681e11..1f0bd6b74c 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -98,7 +98,7 @@ if TYPE_CHECKING:
import jinja2 # Slow import.
from airflow.models.dag import DAG
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom_arg import XComArg
from airflow.utils.task_group import TaskGroup
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index fa1e50b191..5ee6b01f82 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -32,7 +32,7 @@ from enum import Enum
from functools import partial
from pathlib import PurePath
from types import TracebackType
-from typing import TYPE_CHECKING, Any, Callable, Collection, Generator,
Iterable, NamedTuple, Tuple
+from typing import TYPE_CHECKING, Any, Callable, Collection, Generator,
Iterable, Tuple
from urllib.parse import quote
import dill
@@ -92,6 +92,7 @@ from airflow.models.log import Log
from airflow.models.mappedoperator import MappedOperator
from airflow.models.param import process_params
from airflow.models.taskfail import TaskFail
+from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import LazyXComAccess, XCom
@@ -343,40 +344,6 @@ def _is_mappable_value(value: Any) ->
TypeGuard[Collection]:
return True
-class TaskInstanceKey(NamedTuple):
- """Key used to identify task instance."""
-
- dag_id: str
- task_id: str
- run_id: str
- try_number: int = 1
- map_index: int = -1
-
- @property
- def primary(self) -> tuple[str, str, str, int]:
- """Return task instance primary key part of the key"""
- return self.dag_id, self.task_id, self.run_id, self.map_index
-
- @property
- def reduced(self) -> TaskInstanceKey:
- """Remake the key by subtracting 1 from try number to match in memory
information"""
- return TaskInstanceKey(
- self.dag_id, self.task_id, self.run_id, max(1, self.try_number -
1), self.map_index
- )
-
- def with_try_number(self, try_number: int) -> TaskInstanceKey:
- """Returns TaskInstanceKey with provided ``try_number``"""
- return TaskInstanceKey(self.dag_id, self.task_id, self.run_id,
try_number, self.map_index)
-
- @property
- def key(self) -> TaskInstanceKey:
- """For API-compatibly with TaskInstance.
-
- Returns self
- """
- return self
-
-
def _creator_note(val):
"""Custom creator for the ``note`` association proxy."""
if isinstance(val, str):
diff --git a/airflow/models/taskinstancekey.py
b/airflow/models/taskinstancekey.py
new file mode 100644
index 0000000000..b34aaffd85
--- /dev/null
+++ b/airflow/models/taskinstancekey.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import NamedTuple
+
+
+class TaskInstanceKey(NamedTuple):
+ """Key used to identify task instance."""
+
+ dag_id: str
+ task_id: str
+ run_id: str
+ try_number: int = 1
+ map_index: int = -1
+
+ @property
+ def primary(self) -> tuple[str, str, str, int]:
+ """Return task instance primary key part of the key"""
+ return self.dag_id, self.task_id, self.run_id, self.map_index
+
+ @property
+ def reduced(self) -> TaskInstanceKey:
+ """Remake the key by subtracting 1 from try number to match in memory
information"""
+ return TaskInstanceKey(
+ self.dag_id, self.task_id, self.run_id, max(1, self.try_number -
1), self.map_index
+ )
+
+ def with_try_number(self, try_number: int) -> TaskInstanceKey:
+ """Returns TaskInstanceKey with provided ``try_number``"""
+ return TaskInstanceKey(self.dag_id, self.task_id, self.run_id,
try_number, self.map_index)
+
+ @property
+ def key(self) -> TaskInstanceKey:
+ """For API-compatibly with TaskInstance.
+
+ Returns self
+ """
+ return self
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 476b170deb..f9e5abc774 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -68,7 +68,7 @@ from airflow.utils.xcom import (
log = logging.getLogger(__name__)
if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
class BaseXCom(Base, LoggingMixin):
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index b0115636c3..f3560bffc7 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -46,7 +46,7 @@ XCOM_RUN_ID = "trigger_run_id"
if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
class TriggerDagRunLink(BaseOperatorLink):
diff --git a/airflow/providers/amazon/aws/links/base_aws.py
b/airflow/providers/amazon/aws/links/base_aws.py
index fba2f17e96..82acd337bf 100644
--- a/airflow/providers/amazon/aws/links/base_aws.py
+++ b/airflow/providers/amazon/aws/links/base_aws.py
@@ -23,7 +23,7 @@ from airflow.models import BaseOperatorLink, XCom
if TYPE_CHECKING:
from airflow.models import BaseOperator
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
diff --git a/airflow/providers/databricks/operators/databricks.py
b/airflow/providers/databricks/operators/databricks.py
index 0d7ae55cf6..bfeb1c42ba 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -31,7 +31,7 @@ from airflow.providers.databricks.triggers.databricks import
DatabricksExecution
from airflow.providers.databricks.utils.databricks import
normalise_json_content, validate_trigger_event
if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
DEFER_METHOD_NAME = "execute_complete"
diff --git a/airflow/providers/google/cloud/links/base.py
b/airflow/providers/google/cloud/links/base.py
index 755266758e..c898f9890e 100644
--- a/airflow/providers/google/cloud/links/base.py
+++ b/airflow/providers/google/cloud/links/base.py
@@ -23,7 +23,7 @@ from airflow.models import BaseOperatorLink, XCom
if TYPE_CHECKING:
from airflow.models import BaseOperator
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
BASE_LINK = "https://console.cloud.google.com"
diff --git a/airflow/providers/google/cloud/links/datafusion.py
b/airflow/providers/google/cloud/links/datafusion.py
index ae1d9d4ada..fae57f97fd 100644
--- a/airflow/providers/google/cloud/links/datafusion.py
+++ b/airflow/providers/google/cloud/links/datafusion.py
@@ -24,7 +24,7 @@ from airflow.models import BaseOperatorLink, XCom
if TYPE_CHECKING:
from airflow.models import BaseOperator
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
diff --git a/airflow/providers/google/cloud/links/dataproc.py
b/airflow/providers/google/cloud/links/dataproc.py
index 573621aa14..4376d41cb0 100644
--- a/airflow/providers/google/cloud/links/dataproc.py
+++ b/airflow/providers/google/cloud/links/dataproc.py
@@ -25,7 +25,7 @@ from airflow.providers.google.cloud.links.base import
BASE_LINK
if TYPE_CHECKING:
from airflow.models import BaseOperator
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
DATAPROC_BASE_LINK = BASE_LINK + "/dataproc"
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index b117ed948d..45c2e36c46 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -52,7 +52,7 @@ from airflow.providers.google.cloud.triggers.bigquery import (
)
if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py
b/airflow/providers/google/cloud/operators/dataproc_metastore.py
index 3512483434..c7fe6fccf3 100644
--- a/airflow/providers/google/cloud/operators/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -37,7 +37,7 @@ from airflow.providers.google.cloud.operators.cloud_base
import GoogleCloudBaseO
from airflow.providers.google.common.links.storage import StorageLink
if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
diff --git a/airflow/providers/microsoft/azure/operators/data_factory.py
b/airflow/providers/microsoft/azure/operators/data_factory.py
index 1ad3723950..e1fdc997ca 100644
--- a/airflow/providers/microsoft/azure/operators/data_factory.py
+++ b/airflow/providers/microsoft/azure/operators/data_factory.py
@@ -33,7 +33,7 @@ from airflow.providers.microsoft.azure.triggers.data_factory
import AzureDataFac
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
diff --git a/airflow/providers/qubole/operators/qubole.py
b/airflow/providers/qubole/operators/qubole.py
index 710387663f..b7547eabb8 100644
--- a/airflow/providers/qubole/operators/qubole.py
+++ b/airflow/providers/qubole/operators/qubole.py
@@ -33,7 +33,7 @@ from airflow.providers.qubole.hooks.qubole import (
if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstanceKey
+ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
diff --git a/docs/apache-airflow/public-airflow-interface.rst
b/docs/apache-airflow/public-airflow-interface.rst
index 6d36de5df1..74d8657244 100644
--- a/docs/apache-airflow/public-airflow-interface.rst
+++ b/docs/apache-airflow/public-airflow-interface.rst
@@ -151,6 +151,21 @@ passed to the execute method of the operators via the
:class:`~airflow.models.ta
_api/airflow/models/taskinstance/index
+Task Instance Keys
+------------------
+
+Task instance keys are unique identifiers of task instances in a DAG (in a DAG
Run). A key is a tuple that consists of
+``dag_id``, ``task_id``, ``run_id``, ``try_number``, and ``map_index``. The
key of a task instance can be retrieved via
+:meth:`~airflow.models.taskinstance.TaskInstance.key`.
+
+.. toctree::
+ :includehidden:
+ :glob:
+ :maxdepth: 1
+
+ _api/airflow/models/taskinstancekey/index
+
+
Hooks
-----
diff --git a/docs/conf.py b/docs/conf.py
index 5e0f81474a..e77fc22f99 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -246,6 +246,7 @@ if PACKAGE_NAME == "apache-airflow":
"dagbag.py",
"param.py",
"taskinstance.py",
+ "taskinstancekey.py",
"variable.py",
"xcom.py",
}
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index 3233b75001..24346f08cb 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -32,7 +32,7 @@ from urllib3 import HTTPResponse
from airflow import AirflowException
from airflow.exceptions import PodReconciliationError
-from airflow.models.taskinstance import TaskInstanceKey
+from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 37ed6f8a85..6bd4a1ab55 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -45,7 +45,7 @@ from airflow.listeners.listener import get_listener_manager
from airflow.models import DagBag, Pool, TaskInstance as TI
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstanceKey
+from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.taskmap import TaskMap
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 6abdd344ac..a211a9324d 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -27,7 +27,8 @@ from sqlalchemy.orm import Session
from airflow.configuration import conf
from airflow.models.dagrun import DagRun, DagRunType
-from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
+from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.settings import json
diff --git a/tests/test_utils/mock_executor.py
b/tests/test_utils/mock_executor.py
index 43d792dc8d..96d42152c2 100644
--- a/tests/test_utils/mock_executor.py
+++ b/tests/test_utils/mock_executor.py
@@ -21,7 +21,7 @@ from collections import defaultdict
from unittest.mock import MagicMock
from airflow.executors.base_executor import BaseExecutor
-from airflow.models.taskinstance import TaskInstanceKey
+from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.session import create_session
from airflow.utils.state import State