This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 298f1fd73b5 Resolve circular import issue related to task sdk (#44086)
298f1fd73b5 is described below

commit 298f1fd73b500a70b48267160ad0d6bb258fd205
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Nov 18 08:15:01 2024 -0800

    Resolve circular import issue related to task sdk (#44086)
---
 airflow/decorators/base.py                             | 2 +-
 airflow/models/abstractoperator.py                     | 5 +++--
 airflow/models/baseoperator.py                         | 9 +++++----
 airflow/models/dag.py                                  | 2 +-
 airflow/models/param.py                                | 2 +-
 airflow/models/taskinstance.py                         | 2 +-
 airflow/models/xcom_arg.py                             | 3 ++-
 airflow/serialization/serialized_objects.py            | 2 +-
 airflow/template/templater.py                          | 2 +-
 airflow/utils/edgemodifier.py                          | 2 +-
 task_sdk/src/airflow/sdk/execution_time/task_runner.py | 2 +-
 task_sdk/tests/dags/super_basic.py                     | 3 ++-
 task_sdk/tests/defintions/test_dag.py                  | 2 +-
 13 files changed, 21 insertions(+), 17 deletions(-)

diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
index 1c9e441190a..3de2f27d04c 100644
--- a/airflow/decorators/base.py
+++ b/airflow/decorators/base.py
@@ -75,7 +75,7 @@ if TYPE_CHECKING:
         OperatorExpandKwargsArgument,
     )
     from airflow.models.mappedoperator import ValidationSource
-    from airflow.sdk import DAG
+    from airflow.sdk.definitions.dag import DAG
     from airflow.utils.context import Context
     from airflow.utils.task_group import TaskGroup
 
diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index 58c2aec6fde..d8a000352f4 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -50,7 +50,8 @@ if TYPE_CHECKING:
     from airflow.models.dag import DAG as SchedulerDAG
     from airflow.models.mappedoperator import MappedOperator
     from airflow.models.taskinstance import TaskInstance
-    from airflow.sdk import DAG, BaseOperator
+    from airflow.sdk.definitions.baseoperator import BaseOperator
+    from airflow.sdk.definitions.dag import DAG
     from airflow.sdk.definitions.node import DAGNode
     from airflow.task.priority_strategy import PriorityWeightStrategy
     from airflow.triggers.base import StartTriggerArgs
@@ -460,7 +461,7 @@ class AbstractOperator(Templater, TaskSDKAbstractOperator):
 
         from airflow.models.mappedoperator import MappedOperator
         from airflow.models.taskinstance import TaskInstance
-        from airflow.sdk import BaseOperator
+        from airflow.sdk.definitions.baseoperator import BaseOperator
         from airflow.settings import task_instance_mutation_hook
 
         if not isinstance(self, (BaseOperator, MappedOperator)):
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 520060b8b8f..108d9e51d75 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -74,14 +74,15 @@ from airflow.models.base import _sentinel
 from airflow.models.mappedoperator import OperatorPartial, 
validate_mapping_kwargs
 from airflow.models.taskinstance import TaskInstance, clear_task_instances
 from airflow.models.taskmixin import DependencyMixin
-
-# Keeping this file at all is a temp thing as we migrate the repo to the task 
sdk as the base, but to keep
-# main working and useful for others to develop against we use the TaskSDK 
here but keep this file around
-from airflow.sdk import DAG, BaseOperator as TaskSDKBaseOperator, EdgeModifier 
as TaskSDKEdgeModifier
 from airflow.sdk.definitions.baseoperator import (
     BaseOperatorMeta as TaskSDKBaseOperatorMeta,
     get_merged_defaults,
 )
+
+# Keeping this file at all is a temp thing as we migrate the repo to the task 
sdk as the base, but to keep
+# main working and useful for others to develop against we use the TaskSDK 
here but keep this file around
+from airflow.sdk.definitions.dag import DAG, BaseOperator as 
TaskSDKBaseOperator
+from airflow.sdk.definitions.edges import EdgeModifier as TaskSDKEdgeModifier
 from airflow.serialization.enums import DagAttributeTypes
 from airflow.ti_deps.deps.mapped_task_upstream_dep import MappedTaskUpstreamDep
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index a4d1a2b0eda..ff4eac87b46 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -94,7 +94,7 @@ from airflow.models.taskinstance import (
     clear_task_instances,
 )
 from airflow.models.tasklog import LogTemplate
-from airflow.sdk import DAG as TaskSDKDag, dag as task_sdk_dag_decorator
+from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as 
task_sdk_dag_decorator
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
 from airflow.security import permissions
 from airflow.settings import json
diff --git a/airflow/models/param.py b/airflow/models/param.py
index 28253a86f5c..55b4ad0f2b6 100644
--- a/airflow/models/param.py
+++ b/airflow/models/param.py
@@ -29,7 +29,7 @@ from airflow.utils.types import NOTSET, ArgNotSet
 if TYPE_CHECKING:
     from airflow.models.dagrun import DagRun
     from airflow.models.operator import Operator
-    from airflow.sdk import DAG
+    from airflow.sdk.definitions.dag import DAG
     from airflow.serialization.pydantic.dag_run import DagRunPydantic
     from airflow.utils.context import Context
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 701c01219d2..521077ea761 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -163,7 +163,7 @@ if TYPE_CHECKING:
     from airflow.models.dag import DAG as SchedulerDAG, DagModel
     from airflow.models.dagrun import DagRun
     from airflow.models.operator import Operator
-    from airflow.sdk import DAG
+    from airflow.sdk.definitions.dag import DAG
     from airflow.serialization.pydantic.asset import AssetEventPydantic
     from airflow.serialization.pydantic.dag import DagModelPydantic
     from airflow.serialization.pydantic.taskinstance import 
TaskInstancePydantic
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 4e10fe80054..bd8f4555b50 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -42,7 +42,8 @@ if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
     from airflow.models.operator import Operator
-    from airflow.sdk import DAG, BaseOperator
+    from airflow.sdk.definitions.baseoperator import BaseOperator
+    from airflow.sdk.definitions.dag import DAG
     from airflow.utils.context import Context
     from airflow.utils.edgemodifier import EdgeModifier
 
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 12adf6a7f50..cb4a35c2aec 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -60,7 +60,7 @@ from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.models.tasklog import LogTemplate
 from airflow.models.xcom_arg import XComArg, deserialize_xcom_arg, 
serialize_xcom_arg
 from airflow.providers_manager import ProvidersManager
-from airflow.sdk import BaseOperator as TaskSDKBaseOperator
+from airflow.sdk.definitions.baseoperator import BaseOperator as 
TaskSDKBaseOperator
 from airflow.serialization.dag_dependency import DagDependency
 from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.serialization.helpers import serialize_template_field
diff --git a/airflow/template/templater.py b/airflow/template/templater.py
index 70be1013649..ff0a4e51c19 100644
--- a/airflow/template/templater.py
+++ b/airflow/template/templater.py
@@ -31,7 +31,7 @@ if TYPE_CHECKING:
     import jinja2
 
     from airflow.models.operator import Operator
-    from airflow.sdk import DAG
+    from airflow.sdk.definitions.dag import DAG
     from airflow.utils.context import Context
 
 
diff --git a/airflow/utils/edgemodifier.py b/airflow/utils/edgemodifier.py
index b4a7a3d0946..949511c3b40 100644
--- a/airflow/utils/edgemodifier.py
+++ b/airflow/utils/edgemodifier.py
@@ -23,7 +23,7 @@ import airflow.sdk
 if TYPE_CHECKING:
     from airflow.typing_compat import TypeAlias
 
-EdgeModifier: TypeAlias = airflow.sdk.EdgeModifier
+EdgeModifier: TypeAlias = airflow.sdk.definitions.edges.EdgeModifier
 
 
 # Factory functions
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index a6d7569382b..e00efe4597b 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -28,8 +28,8 @@ import attrs
 import structlog
 from pydantic import ConfigDict, TypeAdapter
 
-from airflow.sdk import BaseOperator
 from airflow.sdk.api.datamodels._generated import TaskInstance
+from airflow.sdk.definitions.baseoperator import BaseOperator
 from airflow.sdk.execution_time.comms import StartupDetails, ToSupervisor, 
ToTask
 
 if TYPE_CHECKING:
diff --git a/task_sdk/tests/dags/super_basic.py 
b/task_sdk/tests/dags/super_basic.py
index afd0a9296d5..9a5501185e0 100644
--- a/task_sdk/tests/dags/super_basic.py
+++ b/task_sdk/tests/dags/super_basic.py
@@ -17,7 +17,8 @@
 
 from __future__ import annotations
 
-from airflow.sdk import BaseOperator, dag
+from airflow.sdk.definitions.baseoperator import BaseOperator
+from airflow.sdk.definitions.dag import dag
 
 
 @dag()
diff --git a/task_sdk/tests/defintions/test_dag.py 
b/task_sdk/tests/defintions/test_dag.py
index b2481a49b6a..d2250194aab 100644
--- a/task_sdk/tests/defintions/test_dag.py
+++ b/task_sdk/tests/defintions/test_dag.py
@@ -221,7 +221,7 @@ class TestDag:
         assert "t3" not in partial.task_group.used_group_ids
 
     def test_partial_subset_taskgroup_join_ids(self):
-        from airflow.sdk import TaskGroup
+        from airflow.sdk.definitions.taskgroup import TaskGroup
 
         with DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) as dag:
             start = BaseOperator(task_id="start")

Reply via email to