This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2090656e114 Exploring possibility of moving DagAttributeTypes to 
execution API spec (#61251)
2090656e114 is described below

commit 2090656e11464e6dc9aa556d7408c8a8ad69c408
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Feb 4 17:43:09 2026 +0530

    Exploring possibility of moving DagAttributeTypes to execution API spec 
(#61251)
---
 airflow-core/src/airflow/api_fastapi/execution_api/app.py | 6 ++++++
 task-sdk/src/airflow/sdk/api/datamodels/_generated.py     | 5 +++++
 task-sdk/src/airflow/sdk/bases/operator.py                | 4 ++--
 task-sdk/src/airflow/sdk/definitions/_internal/node.py    | 2 +-
 task-sdk/src/airflow/sdk/definitions/mappedoperator.py    | 2 +-
 task-sdk/src/airflow/sdk/definitions/taskgroup.py         | 4 ++--
 6 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index 9d93f3bf84d..ac0d8012a90 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -265,6 +265,7 @@ def get_extra_schemas() -> dict[str, dict]:
     """Get all the extra schemas that are not part of the main FastAPI app."""
     from airflow.api_fastapi.execution_api.datamodels.taskinstance import 
TaskInstance
     from airflow.executors.workloads import BundleInfo
+    from airflow.serialization.enums import DagAttributeTypes
     from airflow.task.trigger_rule import TriggerRule
     from airflow.task.weight_rule import WeightRule
     from airflow.utils.state import TaskInstanceState, TerminalTIState
@@ -278,6 +279,11 @@ def get_extra_schemas() -> dict[str, dict]:
         "TaskInstanceState": {"type": "string", "enum": 
list(TaskInstanceState)},
         "WeightRule": {"type": "string", "enum": list(WeightRule)},
         "TriggerRule": {"type": "string", "enum": list(TriggerRule)},
+        "DagAttributeTypes": {
+            "type": "string",
+            "enum": [DagAttributeTypes.OP.value, 
DagAttributeTypes.TASK_GROUP.value],
+            "x-enum-varnames": [DagAttributeTypes.OP.name, 
DagAttributeTypes.TASK_GROUP.name],
+        },
     }
 
 
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 149aa5d7dcb..e72cbae4f0e 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -485,6 +485,11 @@ class TriggerRule(str, Enum):
     ALL_SKIPPED = "all_skipped"
 
 
+class DagAttributeTypes(str, Enum):
+    OP = "operator"
+    TASK_GROUP = "taskgroup"
+
+
 class AssetReferenceAssetEventDagRun(BaseModel):
     """
     Schema for AssetModel used in AssetEventDagRunReference.
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py 
b/task-sdk/src/airflow/sdk/bases/operator.py
index 4b6c9c3f90f..4cabe358d0b 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -89,13 +89,13 @@ if TYPE_CHECKING:
     import jinja2
     from typing_extensions import Self
 
+    from airflow.sdk.api.datamodels._generated import DagAttributeTypes
     from airflow.sdk.bases.operatorlink import BaseOperatorLink
     from airflow.sdk.definitions.context import Context
     from airflow.sdk.definitions.dag import DAG
     from airflow.sdk.definitions.operator_resources import Resources
     from airflow.sdk.definitions.taskgroup import TaskGroup
     from airflow.sdk.definitions.xcom_arg import XComArg
-    from airflow.serialization.enums import DagAttributeTypes
     from airflow.task.priority_strategy import PriorityWeightStrategy
     from airflow.triggers.base import BaseTrigger, StartTriggerArgs
 
@@ -1554,7 +1554,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
 
     def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]:
         """Serialize; required by DAGNode."""
-        from airflow.serialization.enums import DagAttributeTypes
+        from airflow.sdk.api.datamodels._generated import DagAttributeTypes
 
         return DagAttributeTypes.OP, self.task_id
 
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/node.py 
b/task-sdk/src/airflow/sdk/definitions/_internal/node.py
index b2cb651efe1..4ee812b103d 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/node.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/node.py
@@ -27,11 +27,11 @@ from airflow.sdk._shared.dagnode.node import GenericDAGNode
 from airflow.sdk.definitions._internal.mixins import DependencyMixin
 
 if TYPE_CHECKING:
+    from airflow.sdk.api.datamodels._generated import DagAttributeTypes
     from airflow.sdk.definitions.dag import DAG
     from airflow.sdk.definitions.edges import EdgeModifier
     from airflow.sdk.definitions.taskgroup import TaskGroup  # noqa: F401
     from airflow.sdk.types import Operator  # noqa: F401
-    from airflow.serialization.enums import DagAttributeTypes
 
 
 KEY_REGEX = re.compile(r"^[\w.-]+$")
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py 
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index e77d462e6a0..345973e11af 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -27,6 +27,7 @@ import attrs
 import methodtools
 from lazy_object_proxy import Proxy
 
+from airflow.sdk.api.datamodels._generated import DagAttributeTypes
 from airflow.sdk.bases.xcom import BaseXCom
 from airflow.sdk.definitions._internal.abstractoperator import (
     DEFAULT_EXECUTOR,
@@ -50,7 +51,6 @@ from airflow.sdk.definitions._internal.expandinput import (
     is_mappable,
 )
 from airflow.sdk.definitions._internal.types import NOTSET
-from airflow.serialization.enums import DagAttributeTypes
 
 if TYPE_CHECKING:
     import datetime
diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py 
b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
index fcdcd6ab1b9..c7c5da21140 100644
--- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -36,6 +36,7 @@ from airflow.sdk.exceptions import (
 )
 
 if TYPE_CHECKING:
+    from airflow.sdk.api.datamodels._generated import DagAttributeTypes
     from airflow.sdk.bases.operator import BaseOperator
     from airflow.sdk.definitions._internal.abstractoperator import 
AbstractOperator
     from airflow.sdk.definitions._internal.expandinput import 
DictOfListsExpandInput, ListOfDictsExpandInput
@@ -43,7 +44,6 @@ if TYPE_CHECKING:
     from airflow.sdk.definitions.dag import DAG
     from airflow.sdk.definitions.edges import EdgeModifier
     from airflow.sdk.types import Operator
-    from airflow.serialization.enums import DagAttributeTypes
 
 
 def _default_parent_group() -> TaskGroup | None:
@@ -493,7 +493,7 @@ class TaskGroup(DAGNode):
 
     def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]:
         """Serialize task group; required by DagNode."""
-        from airflow.serialization.enums import DagAttributeTypes
+        from airflow.sdk.api.datamodels._generated import DagAttributeTypes
         from airflow.serialization.serialized_objects import 
TaskGroupSerialization
 
         return (

Reply via email to