This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 16ebf0bbc7f Introduce BaseTaskInstanceDTO and duplicate it across core
and task-sdk (#67174)
16ebf0bbc7f is described below
commit 16ebf0bbc7f0367bef54e24c904ecad91b96a3ba
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue May 19 20:27:50 2026 +0800
Introduce BaseTaskInstanceDTO and duplicate it across core and task-sdk
(#67174)
---
.pre-commit-config.yaml | 6 +
.../src/airflow/executors/workloads/task.py | 17 ++-
scripts/ci/prek/check_task_instance_dto_sync.py | 125 +++++++++++++++++++++
.../sdk/execution_time/workloads/__init__.py | 23 ++++
.../airflow/sdk/execution_time/workloads/task.py | 53 +++++++++
5 files changed, 221 insertions(+), 3 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 69cbdcd4025..cef17550108 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -502,6 +502,12 @@ repos:
language: python
pass_filenames: false
files:
^dev/registry/registry_tools/types\.py$|^registry/src/_data/types\.json$
+ - id: check-task-instance-dto-sync
+ name: Check BaseTaskInstanceDTO duplicate is in sync between core and
task-sdk
+ entry: ./scripts/ci/prek/check_task_instance_dto_sync.py
+ language: python
+ pass_filenames: false
+ files:
^airflow-core/src/airflow/executors/workloads/task\.py$|^task-sdk/src/airflow/sdk/execution_time/workloads/task\.py$
- id: ruff
name: Run 'ruff' for extremely fast Python linting
description: "Run 'ruff' for extremely fast Python linting"
diff --git a/airflow-core/src/airflow/executors/workloads/task.py
b/airflow-core/src/airflow/executors/workloads/task.py
index d05affe4330..9af3f33c10e 100644
--- a/airflow-core/src/airflow/executors/workloads/task.py
+++ b/airflow-core/src/airflow/executors/workloads/task.py
@@ -33,8 +33,14 @@ if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
-class TaskInstanceDTO(BaseModel):
- """Schema for TaskInstance with minimal required fields needed for
Executors and Task SDK."""
+class BaseTaskInstanceDTO(BaseModel):
+ """
+ Base schema for TaskInstance with the minimal fields shared by Executors
and the Task SDK.
+
+ This definition is duplicated in
:mod:`airflow.sdk.execution_time.workloads.task`
+ and the two are kept in sync by the ``check-task-instance-dto-sync`` prek
+ hook. Update both files together.
+ """
id: uuid.UUID
dag_version_id: uuid.UUID
@@ -48,11 +54,16 @@ class TaskInstanceDTO(BaseModel):
queue: str
priority_weight: int
executor_config: dict | None = Field(default=None, exclude=True)
- external_executor_id: str | None = Field(default=None, exclude=True)
parent_context_carrier: dict | None = None
context_carrier: dict | None = None
+
+class TaskInstanceDTO(BaseTaskInstanceDTO):
+ """TaskInstanceDTO with executor-specific ``external_executor_id`` field
and ``key`` property."""
+
+ external_executor_id: str | None = Field(default=None, exclude=True)
+
# TODO: Task-SDK: Can we replace TaskInstanceKey with just the uuid across
the codebase?
@property
def key(self) -> TaskInstanceKey:
diff --git a/scripts/ci/prek/check_task_instance_dto_sync.py
b/scripts/ci/prek/check_task_instance_dto_sync.py
new file mode 100755
index 00000000000..689d35a4d15
--- /dev/null
+++ b/scripts/ci/prek/check_task_instance_dto_sync.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Verify that the duplicate ``BaseTaskInstanceDTO`` definitions in airflow-core
+and task-sdk stay structurally identical.
+
+``BaseTaskInstanceDTO`` is duplicated (not shared) in:
+
+- ``airflow-core/src/airflow/executors/workloads/task.py``
+- ``task-sdk/src/airflow/sdk/execution_time/workloads/task.py``
+
+This hook compares the *fields* (annotated assignments) and bases of both
+``BaseTaskInstanceDTO`` classes. The concrete ``TaskInstanceDTO`` subclasses
+in each file are allowed to differ (airflow-core adds an executor-specific
+``key`` property that depends on ``airflow.models``, which the Task SDK
+does not have access to).
+"""
+
+from __future__ import annotations
+
+import ast
+import sys
+from pathlib import Path
+
+AIRFLOW_ROOT = Path(__file__).parents[3].resolve()
+CORE_FILE = AIRFLOW_ROOT / "airflow-core" / "src" / "airflow" / "executors" /
"workloads" / "task.py"
+SDK_FILE = AIRFLOW_ROOT / "task-sdk" / "src" / "airflow" / "sdk" /
"execution_time" / "workloads" / "task.py"
+CLASS_NAME = "BaseTaskInstanceDTO"
+
+
+def _find_class(tree: ast.AST, class_name: str) -> ast.ClassDef | None:
+ for node in ast.walk(tree):
+ if isinstance(node, ast.ClassDef) and node.name == class_name:
+ return node
+ return None
+
+
+def _field_signature(class_node: ast.ClassDef) -> list[tuple[str, str, str |
None]]:
+ """Return a normalized list of ``(name, annotation, default)`` for each
field."""
+ fields: list[tuple[str, str, str | None]] = []
+ for stmt in class_node.body:
+ if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target,
ast.Name):
+ name = stmt.target.id
+ annotation = ast.unparse(stmt.annotation)
+ default = ast.unparse(stmt.value) if stmt.value is not None else
None
+ fields.append((name, annotation, default))
+ return fields
+
+
+def _bases(class_node: ast.ClassDef) -> list[str]:
+ return [ast.unparse(base) for base in class_node.bases]
+
+
+def _extract(file_path: Path) -> tuple[list[str], list[tuple[str, str, str |
None]]]:
+ source = file_path.read_text()
+ tree = ast.parse(source, filename=str(file_path))
+ class_node = _find_class(tree, CLASS_NAME)
+ if class_node is None:
+ print(f"ERROR: Could not find class {CLASS_NAME} in {file_path}",
file=sys.stderr)
+ sys.exit(1)
+ return _bases(class_node), _field_signature(class_node)
+
+
+def main() -> None:
+ core_bases, core_fields = _extract(CORE_FILE)
+ sdk_bases, sdk_fields = _extract(SDK_FILE)
+
+ if core_bases == sdk_bases and core_fields == sdk_fields:
+ sys.exit(0)
+
+ print(
+ f"\nERROR: {CLASS_NAME} definitions in airflow-core and task-sdk are
out of sync!",
+ file=sys.stderr,
+ )
+ print(f"\n airflow-core: {CORE_FILE.relative_to(AIRFLOW_ROOT)}",
file=sys.stderr)
+ print(f" task-sdk: {SDK_FILE.relative_to(AIRFLOW_ROOT)}",
file=sys.stderr)
+
+ if core_bases != sdk_bases:
+ print("\nClass bases differ:", file=sys.stderr)
+ print(f" airflow-core: {core_bases}", file=sys.stderr)
+ print(f" task-sdk: {sdk_bases}", file=sys.stderr)
+
+ if core_fields != sdk_fields:
+ core_set = {f[0]: f for f in core_fields}
+ sdk_set = {f[0]: f for f in sdk_fields}
+ only_in_core = sorted(set(core_set) - set(sdk_set))
+ only_in_sdk = sorted(set(sdk_set) - set(core_set))
+ differing = sorted(name for name in set(core_set) & set(sdk_set) if
core_set[name] != sdk_set[name])
+ if only_in_core:
+ print(f"\n Fields only in airflow-core: {only_in_core}",
file=sys.stderr)
+ if only_in_sdk:
+ print(f"\n Fields only in task-sdk: {only_in_sdk}",
file=sys.stderr)
+ for name in differing:
+ print(
+ f"\n Field {name!r} differs:"
+ f"\n airflow-core: {core_set[name]}"
+ f"\n task-sdk: {sdk_set[name]}",
+ file=sys.stderr,
+ )
+
+ print(
+ f"\nUpdate both files together so the two {CLASS_NAME} definitions
stay in sync.",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/task-sdk/src/airflow/sdk/execution_time/workloads/__init__.py
b/task-sdk/src/airflow/sdk/execution_time/workloads/__init__.py
new file mode 100644
index 00000000000..cdf955e742d
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/execution_time/workloads/__init__.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Workload schemas for Task SDK execution-time communication."""
+
+from __future__ import annotations
+
+from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+__all__ = ["TaskInstanceDTO"]
diff --git a/task-sdk/src/airflow/sdk/execution_time/workloads/task.py
b/task-sdk/src/airflow/sdk/execution_time/workloads/task.py
new file mode 100644
index 00000000000..ceff200856f
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/execution_time/workloads/task.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task workload schemas for Task SDK execution-time communication."""
+
+from __future__ import annotations
+
+import uuid
+
+from pydantic import BaseModel, Field
+
+
+class BaseTaskInstanceDTO(BaseModel):
+ """
+ Base schema for TaskInstance with the minimal fields shared by Executors
and the Task SDK.
+
+ This class is duplicated in :mod:`airflow.executors.workloads.task` and the
+ two definitions are kept in sync by the ``check-task-instance-dto-sync``
+ prek hook. Update both files together.
+ """
+
+ id: uuid.UUID
+ dag_version_id: uuid.UUID
+ task_id: str
+ dag_id: str
+ run_id: str
+ try_number: int
+ map_index: int = -1
+
+ pool_slots: int
+ queue: str
+ priority_weight: int
+ executor_config: dict | None = Field(default=None, exclude=True)
+
+ parent_context_carrier: dict | None = None
+ context_carrier: dict | None = None
+
+
+class TaskInstanceDTO(BaseTaskInstanceDTO):
+ """Task SDK TaskInstanceDTO."""