This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2090656e114 Exploring possibility of moving DagAttributeTypes to
execution API spec (#61251)
2090656e114 is described below
commit 2090656e11464e6dc9aa556d7408c8a8ad69c408
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Feb 4 17:43:09 2026 +0530
Exploring possibility of moving DagAttributeTypes to execution API spec
(#61251)
---
airflow-core/src/airflow/api_fastapi/execution_api/app.py | 6 ++++++
task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 5 +++++
task-sdk/src/airflow/sdk/bases/operator.py | 4 ++--
task-sdk/src/airflow/sdk/definitions/_internal/node.py | 2 +-
task-sdk/src/airflow/sdk/definitions/mappedoperator.py | 2 +-
task-sdk/src/airflow/sdk/definitions/taskgroup.py | 4 ++--
6 files changed, 17 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
index 9d93f3bf84d..ac0d8012a90 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py
@@ -265,6 +265,7 @@ def get_extra_schemas() -> dict[str, dict]:
"""Get all the extra schemas that are not part of the main FastAPI app."""
from airflow.api_fastapi.execution_api.datamodels.taskinstance import
TaskInstance
from airflow.executors.workloads import BundleInfo
+ from airflow.serialization.enums import DagAttributeTypes
from airflow.task.trigger_rule import TriggerRule
from airflow.task.weight_rule import WeightRule
from airflow.utils.state import TaskInstanceState, TerminalTIState
@@ -278,6 +279,11 @@ def get_extra_schemas() -> dict[str, dict]:
"TaskInstanceState": {"type": "string", "enum":
list(TaskInstanceState)},
"WeightRule": {"type": "string", "enum": list(WeightRule)},
"TriggerRule": {"type": "string", "enum": list(TriggerRule)},
+ "DagAttributeTypes": {
+ "type": "string",
+ "enum": [DagAttributeTypes.OP.value,
DagAttributeTypes.TASK_GROUP.value],
+ "x-enum-varnames": [DagAttributeTypes.OP.name,
DagAttributeTypes.TASK_GROUP.name],
+ },
}
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 149aa5d7dcb..e72cbae4f0e 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -485,6 +485,11 @@ class TriggerRule(str, Enum):
ALL_SKIPPED = "all_skipped"
+class DagAttributeTypes(str, Enum):
+ OP = "operator"
+ TASK_GROUP = "taskgroup"
+
+
class AssetReferenceAssetEventDagRun(BaseModel):
"""
Schema for AssetModel used in AssetEventDagRunReference.
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py
b/task-sdk/src/airflow/sdk/bases/operator.py
index 4b6c9c3f90f..4cabe358d0b 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -89,13 +89,13 @@ if TYPE_CHECKING:
import jinja2
from typing_extensions import Self
+ from airflow.sdk.api.datamodels._generated import DagAttributeTypes
from airflow.sdk.bases.operatorlink import BaseOperatorLink
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.operator_resources import Resources
from airflow.sdk.definitions.taskgroup import TaskGroup
from airflow.sdk.definitions.xcom_arg import XComArg
- from airflow.serialization.enums import DagAttributeTypes
from airflow.task.priority_strategy import PriorityWeightStrategy
from airflow.triggers.base import BaseTrigger, StartTriggerArgs
@@ -1554,7 +1554,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]:
"""Serialize; required by DAGNode."""
- from airflow.serialization.enums import DagAttributeTypes
+ from airflow.sdk.api.datamodels._generated import DagAttributeTypes
return DagAttributeTypes.OP, self.task_id
diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/node.py
b/task-sdk/src/airflow/sdk/definitions/_internal/node.py
index b2cb651efe1..4ee812b103d 100644
--- a/task-sdk/src/airflow/sdk/definitions/_internal/node.py
+++ b/task-sdk/src/airflow/sdk/definitions/_internal/node.py
@@ -27,11 +27,11 @@ from airflow.sdk._shared.dagnode.node import GenericDAGNode
from airflow.sdk.definitions._internal.mixins import DependencyMixin
if TYPE_CHECKING:
+ from airflow.sdk.api.datamodels._generated import DagAttributeTypes
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.edges import EdgeModifier
from airflow.sdk.definitions.taskgroup import TaskGroup # noqa: F401
from airflow.sdk.types import Operator # noqa: F401
- from airflow.serialization.enums import DagAttributeTypes
KEY_REGEX = re.compile(r"^[\w.-]+$")
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index e77d462e6a0..345973e11af 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -27,6 +27,7 @@ import attrs
import methodtools
from lazy_object_proxy import Proxy
+from airflow.sdk.api.datamodels._generated import DagAttributeTypes
from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.definitions._internal.abstractoperator import (
DEFAULT_EXECUTOR,
@@ -50,7 +51,6 @@ from airflow.sdk.definitions._internal.expandinput import (
is_mappable,
)
from airflow.sdk.definitions._internal.types import NOTSET
-from airflow.serialization.enums import DagAttributeTypes
if TYPE_CHECKING:
import datetime
diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
index fcdcd6ab1b9..c7c5da21140 100644
--- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -36,6 +36,7 @@ from airflow.sdk.exceptions import (
)
if TYPE_CHECKING:
+ from airflow.sdk.api.datamodels._generated import DagAttributeTypes
from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.definitions._internal.abstractoperator import
AbstractOperator
from airflow.sdk.definitions._internal.expandinput import
DictOfListsExpandInput, ListOfDictsExpandInput
@@ -43,7 +44,6 @@ if TYPE_CHECKING:
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.edges import EdgeModifier
from airflow.sdk.types import Operator
- from airflow.serialization.enums import DagAttributeTypes
def _default_parent_group() -> TaskGroup | None:
@@ -493,7 +493,7 @@ class TaskGroup(DAGNode):
def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]:
"""Serialize task group; required by DagNode."""
- from airflow.serialization.enums import DagAttributeTypes
+ from airflow.sdk.api.datamodels._generated import DagAttributeTypes
from airflow.serialization.serialized_objects import
TaskGroupSerialization
return (