This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 16ebf0bbc7f Introduce BaseTaskInstanceDTO and duplicate it across core 
and task-sdk (#67174)
16ebf0bbc7f is described below

commit 16ebf0bbc7f0367bef54e24c904ecad91b96a3ba
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue May 19 20:27:50 2026 +0800

    Introduce BaseTaskInstanceDTO and duplicate it across core and task-sdk 
(#67174)
---
 .pre-commit-config.yaml                            |   6 +
 .../src/airflow/executors/workloads/task.py        |  17 ++-
 scripts/ci/prek/check_task_instance_dto_sync.py    | 125 +++++++++++++++++++++
 .../sdk/execution_time/workloads/__init__.py       |  23 ++++
 .../airflow/sdk/execution_time/workloads/task.py   |  53 +++++++++
 5 files changed, 221 insertions(+), 3 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 69cbdcd4025..cef17550108 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -502,6 +502,12 @@ repos:
         language: python
         pass_filenames: false
         files: 
^dev/registry/registry_tools/types\.py$|^registry/src/_data/types\.json$
+      - id: check-task-instance-dto-sync
+        name: Check BaseTaskInstanceDTO duplicate is in sync between core and 
task-sdk
+        entry: ./scripts/ci/prek/check_task_instance_dto_sync.py
+        language: python
+        pass_filenames: false
+        files: 
^airflow-core/src/airflow/executors/workloads/task\.py$|^task-sdk/src/airflow/sdk/execution_time/workloads/task\.py$
       - id: ruff
         name: Run 'ruff' for extremely fast Python linting
         description: "Run 'ruff' for extremely fast Python linting"
diff --git a/airflow-core/src/airflow/executors/workloads/task.py 
b/airflow-core/src/airflow/executors/workloads/task.py
index d05affe4330..9af3f33c10e 100644
--- a/airflow-core/src/airflow/executors/workloads/task.py
+++ b/airflow-core/src/airflow/executors/workloads/task.py
@@ -33,8 +33,14 @@ if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
 
 
-class TaskInstanceDTO(BaseModel):
-    """Schema for TaskInstance with minimal required fields needed for 
Executors and Task SDK."""
+class BaseTaskInstanceDTO(BaseModel):
+    """
+    Base schema for TaskInstance with the minimal fields shared by Executors 
and the Task SDK.
+
+    This definition is duplicated in 
:mod:`airflow.sdk.execution_time.workloads.task`
+    and the two are kept in sync by the ``check-task-instance-dto-sync`` prek
+    hook. Update both files together.
+    """
 
     id: uuid.UUID
     dag_version_id: uuid.UUID
@@ -48,11 +54,16 @@ class TaskInstanceDTO(BaseModel):
     queue: str
     priority_weight: int
     executor_config: dict | None = Field(default=None, exclude=True)
-    external_executor_id: str | None = Field(default=None, exclude=True)
 
     parent_context_carrier: dict | None = None
     context_carrier: dict | None = None
 
+
+class TaskInstanceDTO(BaseTaskInstanceDTO):
+    """TaskInstanceDTO with executor-specific ``external_executor_id`` field 
and ``key`` property."""
+
+    external_executor_id: str | None = Field(default=None, exclude=True)
+
     # TODO: Task-SDK: Can we replace TaskInstanceKey with just the uuid across 
the codebase?
     @property
     def key(self) -> TaskInstanceKey:
diff --git a/scripts/ci/prek/check_task_instance_dto_sync.py 
b/scripts/ci/prek/check_task_instance_dto_sync.py
new file mode 100755
index 00000000000..689d35a4d15
--- /dev/null
+++ b/scripts/ci/prek/check_task_instance_dto_sync.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Verify that the duplicate ``BaseTaskInstanceDTO`` definitions in airflow-core
+and task-sdk stay structurally identical.
+
+``BaseTaskInstanceDTO`` is duplicated (not shared) in:
+
+- ``airflow-core/src/airflow/executors/workloads/task.py``
+- ``task-sdk/src/airflow/sdk/execution_time/workloads/task.py``
+
+This hook compares the *fields* (annotated assignments) and bases of both
+``BaseTaskInstanceDTO`` classes. The concrete ``TaskInstanceDTO`` subclasses
+in each file are allowed to differ (airflow-core adds an executor-specific
+``key`` property that depends on ``airflow.models``, which the Task SDK
+does not have access to).
+"""
+
+from __future__ import annotations
+
+import ast
+import sys
+from pathlib import Path
+
+AIRFLOW_ROOT = Path(__file__).parents[3].resolve()
+CORE_FILE = AIRFLOW_ROOT / "airflow-core" / "src" / "airflow" / "executors" / 
"workloads" / "task.py"
+SDK_FILE = AIRFLOW_ROOT / "task-sdk" / "src" / "airflow" / "sdk" / 
"execution_time" / "workloads" / "task.py"
+CLASS_NAME = "BaseTaskInstanceDTO"
+
+
+def _find_class(tree: ast.AST, class_name: str) -> ast.ClassDef | None:
+    for node in ast.walk(tree):
+        if isinstance(node, ast.ClassDef) and node.name == class_name:
+            return node
+    return None
+
+
+def _field_signature(class_node: ast.ClassDef) -> list[tuple[str, str, str | 
None]]:
+    """Return a normalized list of ``(name, annotation, default)`` for each 
field."""
+    fields: list[tuple[str, str, str | None]] = []
+    for stmt in class_node.body:
+        if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, 
ast.Name):
+            name = stmt.target.id
+            annotation = ast.unparse(stmt.annotation)
+            default = ast.unparse(stmt.value) if stmt.value is not None else 
None
+            fields.append((name, annotation, default))
+    return fields
+
+
+def _bases(class_node: ast.ClassDef) -> list[str]:
+    return [ast.unparse(base) for base in class_node.bases]
+
+
+def _extract(file_path: Path) -> tuple[list[str], list[tuple[str, str, str | 
None]]]:
+    source = file_path.read_text()
+    tree = ast.parse(source, filename=str(file_path))
+    class_node = _find_class(tree, CLASS_NAME)
+    if class_node is None:
+        print(f"ERROR: Could not find class {CLASS_NAME} in {file_path}", 
file=sys.stderr)
+        sys.exit(1)
+    return _bases(class_node), _field_signature(class_node)
+
+
+def main() -> None:
+    core_bases, core_fields = _extract(CORE_FILE)
+    sdk_bases, sdk_fields = _extract(SDK_FILE)
+
+    if core_bases == sdk_bases and core_fields == sdk_fields:
+        sys.exit(0)
+
+    print(
+        f"\nERROR: {CLASS_NAME} definitions in airflow-core and task-sdk are 
out of sync!",
+        file=sys.stderr,
+    )
+    print(f"\n  airflow-core: {CORE_FILE.relative_to(AIRFLOW_ROOT)}", 
file=sys.stderr)
+    print(f"  task-sdk:     {SDK_FILE.relative_to(AIRFLOW_ROOT)}", 
file=sys.stderr)
+
+    if core_bases != sdk_bases:
+        print("\nClass bases differ:", file=sys.stderr)
+        print(f"  airflow-core: {core_bases}", file=sys.stderr)
+        print(f"  task-sdk:     {sdk_bases}", file=sys.stderr)
+
+    if core_fields != sdk_fields:
+        core_set = {f[0]: f for f in core_fields}
+        sdk_set = {f[0]: f for f in sdk_fields}
+        only_in_core = sorted(set(core_set) - set(sdk_set))
+        only_in_sdk = sorted(set(sdk_set) - set(core_set))
+        differing = sorted(name for name in set(core_set) & set(sdk_set) if 
core_set[name] != sdk_set[name])
+        if only_in_core:
+            print(f"\n  Fields only in airflow-core: {only_in_core}", 
file=sys.stderr)
+        if only_in_sdk:
+            print(f"\n  Fields only in task-sdk: {only_in_sdk}", 
file=sys.stderr)
+        for name in differing:
+            print(
+                f"\n  Field {name!r} differs:"
+                f"\n    airflow-core: {core_set[name]}"
+                f"\n    task-sdk:     {sdk_set[name]}",
+                file=sys.stderr,
+            )
+
+    print(
+        f"\nUpdate both files together so the two {CLASS_NAME} definitions 
stay in sync.",
+        file=sys.stderr,
+    )
+    sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/task-sdk/src/airflow/sdk/execution_time/workloads/__init__.py 
b/task-sdk/src/airflow/sdk/execution_time/workloads/__init__.py
new file mode 100644
index 00000000000..cdf955e742d
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/execution_time/workloads/__init__.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Workload schemas for Task SDK execution-time communication."""
+
+from __future__ import annotations
+
+from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
+
+__all__ = ["TaskInstanceDTO"]
diff --git a/task-sdk/src/airflow/sdk/execution_time/workloads/task.py 
b/task-sdk/src/airflow/sdk/execution_time/workloads/task.py
new file mode 100644
index 00000000000..ceff200856f
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/execution_time/workloads/task.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task workload schemas for Task SDK execution-time communication."""
+
+from __future__ import annotations
+
+import uuid
+
+from pydantic import BaseModel, Field
+
+
+class BaseTaskInstanceDTO(BaseModel):
+    """
+    Base schema for TaskInstance with the minimal fields shared by Executors 
and the Task SDK.
+
+    This class is duplicated in :mod:`airflow.executors.workloads.task` and the
+    two definitions are kept in sync by the ``check-task-instance-dto-sync``
+    prek hook. Update both files together.
+    """
+
+    id: uuid.UUID
+    dag_version_id: uuid.UUID
+    task_id: str
+    dag_id: str
+    run_id: str
+    try_number: int
+    map_index: int = -1
+
+    pool_slots: int
+    queue: str
+    priority_weight: int
+    executor_config: dict | None = Field(default=None, exclude=True)
+
+    parent_context_carrier: dict | None = None
+    context_carrier: dict | None = None
+
+
+class TaskInstanceDTO(BaseTaskInstanceDTO):
+    """Task SDK TaskInstanceDTO."""

Reply via email to