This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 298f1fd73b5 Resolve circular import issue related to task sdk (#44086)
298f1fd73b5 is described below
commit 298f1fd73b500a70b48267160ad0d6bb258fd205
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Nov 18 08:15:01 2024 -0800
Resolve circular import issue related to task sdk (#44086)
---
airflow/decorators/base.py | 2 +-
airflow/models/abstractoperator.py | 5 +++--
airflow/models/baseoperator.py | 9 +++++----
airflow/models/dag.py | 2 +-
airflow/models/param.py | 2 +-
airflow/models/taskinstance.py | 2 +-
airflow/models/xcom_arg.py | 3 ++-
airflow/serialization/serialized_objects.py | 2 +-
airflow/template/templater.py | 2 +-
airflow/utils/edgemodifier.py | 2 +-
task_sdk/src/airflow/sdk/execution_time/task_runner.py | 2 +-
task_sdk/tests/dags/super_basic.py | 3 ++-
task_sdk/tests/defintions/test_dag.py | 2 +-
13 files changed, 21 insertions(+), 17 deletions(-)
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 1c9e441190a..3de2f27d04c 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -75,7 +75,7 @@ if TYPE_CHECKING:
OperatorExpandKwargsArgument,
)
from airflow.models.mappedoperator import ValidationSource
- from airflow.sdk import DAG
+ from airflow.sdk.definitions.dag import DAG
from airflow.utils.context import Context
from airflow.utils.task_group import TaskGroup
diff --git a/airflow/models/abstractoperator.py
b/airflow/models/abstractoperator.py
index 58c2aec6fde..d8a000352f4 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -50,7 +50,8 @@ if TYPE_CHECKING:
from airflow.models.dag import DAG as SchedulerDAG
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
- from airflow.sdk import DAG, BaseOperator
+ from airflow.sdk.definitions.baseoperator import BaseOperator
+ from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.node import DAGNode
from airflow.task.priority_strategy import PriorityWeightStrategy
from airflow.triggers.base import StartTriggerArgs
@@ -460,7 +461,7 @@ class AbstractOperator(Templater, TaskSDKAbstractOperator):
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
- from airflow.sdk import BaseOperator
+ from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.settings import task_instance_mutation_hook
if not isinstance(self, (BaseOperator, MappedOperator)):
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 520060b8b8f..108d9e51d75 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -74,14 +74,15 @@ from airflow.models.base import _sentinel
from airflow.models.mappedoperator import OperatorPartial,
validate_mapping_kwargs
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.models.taskmixin import DependencyMixin
-
-# Keeping this file at all is a temp thing as we migrate the repo to the task
sdk as the base, but to keep
-# main working and useful for others to develop against we use the TaskSDK
here but keep this file around
-from airflow.sdk import DAG, BaseOperator as TaskSDKBaseOperator, EdgeModifier
as TaskSDKEdgeModifier
from airflow.sdk.definitions.baseoperator import (
BaseOperatorMeta as TaskSDKBaseOperatorMeta,
get_merged_defaults,
)
+
+# Keeping this file at all is a temp thing as we migrate the repo to the task
sdk as the base, but to keep
+# main working and useful for others to develop against we use the TaskSDK
here but keep this file around
+from airflow.sdk.definitions.dag import DAG, BaseOperator as
TaskSDKBaseOperator
+from airflow.sdk.definitions.edges import EdgeModifier as TaskSDKEdgeModifier
from airflow.serialization.enums import DagAttributeTypes
from airflow.ti_deps.deps.mapped_task_upstream_dep import MappedTaskUpstreamDep
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index a4d1a2b0eda..ff4eac87b46 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -94,7 +94,7 @@ from airflow.models.taskinstance import (
clear_task_instances,
)
from airflow.models.tasklog import LogTemplate
-from airflow.sdk import DAG as TaskSDKDag, dag as task_sdk_dag_decorator
+from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as
task_sdk_dag_decorator
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.settings import json
diff --git a/airflow/models/param.py b/airflow/models/param.py
index 28253a86f5c..55b4ad0f2b6 100644
--- a/airflow/models/param.py
+++ b/airflow/models/param.py
@@ -29,7 +29,7 @@ from airflow.utils.types import NOTSET, ArgNotSet
if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator
- from airflow.sdk import DAG
+ from airflow.sdk.definitions.dag import DAG
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.utils.context import Context
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 701c01219d2..521077ea761 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -163,7 +163,7 @@ if TYPE_CHECKING:
from airflow.models.dag import DAG as SchedulerDAG, DagModel
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator
- from airflow.sdk import DAG
+ from airflow.sdk.definitions.dag import DAG
from airflow.serialization.pydantic.asset import AssetEventPydantic
from airflow.serialization.pydantic.dag import DagModelPydantic
from airflow.serialization.pydantic.taskinstance import
TaskInstancePydantic
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 4e10fe80054..bd8f4555b50 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -42,7 +42,8 @@ if TYPE_CHECKING:
from sqlalchemy.orm import Session
from airflow.models.operator import Operator
- from airflow.sdk import DAG, BaseOperator
+ from airflow.sdk.definitions.baseoperator import BaseOperator
+ from airflow.sdk.definitions.dag import DAG
from airflow.utils.context import Context
from airflow.utils.edgemodifier import EdgeModifier
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 12adf6a7f50..cb4a35c2aec 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -60,7 +60,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.tasklog import LogTemplate
from airflow.models.xcom_arg import XComArg, deserialize_xcom_arg,
serialize_xcom_arg
from airflow.providers_manager import ProvidersManager
-from airflow.sdk import BaseOperator as TaskSDKBaseOperator
+from airflow.sdk.definitions.baseoperator import BaseOperator as
TaskSDKBaseOperator
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.helpers import serialize_template_field
diff --git a/airflow/template/templater.py b/airflow/template/templater.py
index 70be1013649..ff0a4e51c19 100644
--- a/airflow/template/templater.py
+++ b/airflow/template/templater.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
import jinja2
from airflow.models.operator import Operator
- from airflow.sdk import DAG
+ from airflow.sdk.definitions.dag import DAG
from airflow.utils.context import Context
diff --git a/airflow/utils/edgemodifier.py b/airflow/utils/edgemodifier.py
index b4a7a3d0946..949511c3b40 100644
--- a/airflow/utils/edgemodifier.py
+++ b/airflow/utils/edgemodifier.py
@@ -23,7 +23,7 @@ import airflow.sdk
if TYPE_CHECKING:
from airflow.typing_compat import TypeAlias
-EdgeModifier: TypeAlias = airflow.sdk.EdgeModifier
+EdgeModifier: TypeAlias = airflow.sdk.definitions.edges.EdgeModifier
# Factory functions
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index a6d7569382b..e00efe4597b 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -28,8 +28,8 @@ import attrs
import structlog
from pydantic import ConfigDict, TypeAdapter
-from airflow.sdk import BaseOperator
from airflow.sdk.api.datamodels._generated import TaskInstance
+from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.execution_time.comms import StartupDetails, ToSupervisor,
ToTask
if TYPE_CHECKING:
diff --git a/task_sdk/tests/dags/super_basic.py
b/task_sdk/tests/dags/super_basic.py
index afd0a9296d5..9a5501185e0 100644
--- a/task_sdk/tests/dags/super_basic.py
+++ b/task_sdk/tests/dags/super_basic.py
@@ -17,7 +17,8 @@
from __future__ import annotations
-from airflow.sdk import BaseOperator, dag
+from airflow.sdk.definitions.baseoperator import BaseOperator
+from airflow.sdk.definitions.dag import dag
@dag()
diff --git a/task_sdk/tests/defintions/test_dag.py
b/task_sdk/tests/defintions/test_dag.py
index b2481a49b6a..d2250194aab 100644
--- a/task_sdk/tests/defintions/test_dag.py
+++ b/task_sdk/tests/defintions/test_dag.py
@@ -221,7 +221,7 @@ class TestDag:
assert "t3" not in partial.task_group.used_group_ids
def test_partial_subset_taskgroup_join_ids(self):
- from airflow.sdk import TaskGroup
+ from airflow.sdk.definitions.taskgroup import TaskGroup
with DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) as dag:
start = BaseOperator(task_id="start")