This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 363fe113232ab5d13ddeade5d2144fcbe02a18e8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 3 18:37:00 2025 +0800

    [v3-1-test] Make `extra` of Asset, AssetAlias, AssetEvent a `dict[str, 
JsonValue]` (#57352) (#57728)
    
    Co-authored-by: Wei Lee <[email protected]>
    fixup! [v3-1-test] Make `extra` of Asset, AssetAlias, AssetEvent a 
`dict[str, JsonValue]` (#57352) (cherry picked from commit 
c25455ef4df1b2caa2a19d5a8568fef1cc314f04)
---
 .../api_fastapi/core_api/datamodels/assets.py      |   6 +-
 .../core_api/openapi/v2-rest-api-generated.yaml    |   6 +-
 .../api_fastapi/execution_api/datamodels/asset.py  |   4 +-
 .../execution_api/datamodels/asset_event.py        |   4 +-
 .../execution_api/datamodels/taskinstance.py       |   5 +-
 airflow-core/src/airflow/lineage/hook.py           |   8 +-
 airflow-core/src/airflow/models/taskinstance.py    |  14 ++-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |   8 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |   4 +-
 .../tests/unit/models/test_taskinstance.py         |  20 ++-
 .../unit/serialization/test_serialized_objects.py  |   6 +-
 .../src/airflowctl/api/datamodels/generated.py     |   4 +-
 .../tests/airflow_ctl/api/test_operations.py       |   2 +-
 .../src/airflow/sdk/api/datamodels/_generated.py   | 140 ++++++++++-----------
 .../src/airflow/sdk/definitions/asset/__init__.py  |  16 +--
 .../airflow/sdk/definitions/asset/decorators.py    |   4 +-
 .../src/airflow/sdk/definitions/asset/metadata.py  |   6 +-
 task-sdk/src/airflow/sdk/execution_time/context.py |   6 +-
 .../src/airflow/sdk/execution_time/task_runner.py  |   2 +-
 task-sdk/src/airflow/sdk/types.py                  |   8 +-
 20 files changed, 155 insertions(+), 118 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
index dd08cc16ec0..00f58257b83 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from pydantic import AliasPath, ConfigDict, Field, NonNegativeInt, 
field_validator
+from pydantic import AliasPath, ConfigDict, Field, JsonValue, NonNegativeInt, 
field_validator
 
 from airflow._shared.secrets_masker import redact
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
@@ -65,7 +65,7 @@ class AssetResponse(BaseModel):
     name: str
     uri: str
     group: str
-    extra: dict | None = None
+    extra: dict[str, JsonValue] | None = None
     created_at: datetime
     updated_at: datetime
     scheduled_dags: list[DagScheduleAssetReference]
@@ -123,7 +123,7 @@ class AssetEventResponse(BaseModel):
     uri: str | None = Field(alias="uri", default=None)
     name: str | None = Field(alias="name", default=None)
     group: str | None = Field(alias="group", default=None)
-    extra: dict | None = None
+    extra: dict[str, JsonValue] | None = None
     source_task_id: str | None = None
     source_dag_id: str | None = None
     source_run_id: str | None = None
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 8a631c19096..3c1729d1c97 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -8666,7 +8666,8 @@ components:
           title: Group
         extra:
           anyOf:
-          - additionalProperties: true
+          - additionalProperties:
+              $ref: '#/components/schemas/JsonValue'
             type: object
           - type: 'null'
           title: Extra
@@ -8722,7 +8723,8 @@ components:
           title: Group
         extra:
           anyOf:
-          - additionalProperties: true
+          - additionalProperties:
+              $ref: '#/components/schemas/JsonValue'
             type: object
           - type: 'null'
           title: Extra
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset.py
index 4cd23ddbb82..c11bccdd59a 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+from pydantic.types import JsonValue
+
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 
 
@@ -26,7 +28,7 @@ class AssetResponse(BaseModel):
     name: str
     uri: str
     group: str
-    extra: dict | None = None
+    extra: dict[str, JsonValue] | None = None
 
 
 class AssetAliasResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
index a9a66f242fc..b050133212e 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 
 from datetime import datetime
 
+from pydantic.types import JsonValue
+
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 from airflow.api_fastapi.execution_api.datamodels.asset import AssetResponse
 
@@ -41,7 +43,7 @@ class AssetEventResponse(BaseModel):
 
     id: int
     timestamp: datetime
-    extra: dict | None = None
+    extra: dict[str, JsonValue] | None = None
 
     asset: AssetResponse
     created_dagruns: list[DagRunAssetReference]
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 9f639b1e391..f2e7aa36b0d 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -25,6 +25,7 @@ from pydantic import (
     AwareDatetime,
     Discriminator,
     Field,
+    JsonValue,
     Tag,
     TypeAdapter,
     WithJsonSchema,
@@ -258,7 +259,7 @@ class AssetReferenceAssetEventDagRun(StrictBaseModel):
 
     name: str
     uri: str
-    extra: dict
+    extra: dict[str, JsonValue]
 
 
 class AssetAliasReferenceAssetEventDagRun(StrictBaseModel):
@@ -271,7 +272,7 @@ class AssetEventDagRunReference(StrictBaseModel):
     """Schema for AssetEvent model used in DagRun."""
 
     asset: AssetReferenceAssetEventDagRun
-    extra: dict
+    extra: dict[str, JsonValue]
     source_task_id: str | None
     source_dag_id: str | None
     source_run_id: str | None
diff --git a/airflow-core/src/airflow/lineage/hook.py 
b/airflow-core/src/airflow/lineage/hook.py
index b69f12484dc..ede28a7855f 100644
--- a/airflow-core/src/airflow/lineage/hook.py
+++ b/airflow-core/src/airflow/lineage/hook.py
@@ -29,6 +29,8 @@ from airflow.sdk.definitions.asset import Asset
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
+    from pydantic.types import JsonValue
+
     from airflow.sdk import BaseHook, ObjectStoragePath
 
     # Store context what sent lineage.
@@ -107,7 +109,7 @@ class HookLineageCollector(LoggingMixin):
         name: str | None = None,
         group: str | None = None,
         asset_kwargs: dict | None = None,
-        asset_extra: dict | None = None,
+        asset_extra: dict[str, JsonValue] | None = None,
     ) -> Asset | None:
         """
         Create an asset instance using the provided parameters.
@@ -161,7 +163,7 @@ class HookLineageCollector(LoggingMixin):
         name: str | None = None,
         group: str | None = None,
         asset_kwargs: dict | None = None,
-        asset_extra: dict | None = None,
+        asset_extra: dict[str, JsonValue] | None = None,
     ):
         """Add the input asset and its corresponding hook execution context to 
the collector."""
         if len(self._inputs) >= MAX_COLLECTED_ASSETS:
@@ -186,7 +188,7 @@ class HookLineageCollector(LoggingMixin):
         name: str | None = None,
         group: str | None = None,
         asset_kwargs: dict | None = None,
-        asset_extra: dict | None = None,
+        asset_extra: dict[str, JsonValue] | None = None,
     ):
         """Add the output asset and its corresponding hook execution context 
to the collector."""
         if len(self._outputs) >= MAX_COLLECTED_ASSETS:
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index e9806696e24..7b291841103 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import contextlib
 import hashlib
 import itertools
+import json
 import logging
 import math
 import uuid
@@ -1380,7 +1381,7 @@ class TaskInstance(Base, LoggingMixin):
                     session=session,
                 )
 
-        def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, 
frozenset], set[str]]:
+        def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, 
str], set[str]]:
             d = defaultdict(set)
             for event in outlet_events:
                 try:
@@ -1390,19 +1391,20 @@ class TaskInstance(Base, LoggingMixin):
                 if alias_name not in outlet_alias_names:
                     continue
                 asset_key = AssetUniqueKey(**event["dest_asset_key"])
-                extra_key = frozenset(event["extra"].items())
-                d[asset_key, extra_key].add(alias_name)
+                extra_json = json.dumps(event["extra"], sort_keys=True)
+                d[asset_key, extra_json].add(alias_name)
             return d
 
         outlet_alias_names = {o.name for o in task_outlets if o.type == 
AssetAlias.__name__ and o.name}
         if outlet_alias_names and (event_extras_from_aliases := 
_asset_event_extras_from_aliases()):
-            for (asset_key, extra_key), event_aliase_names in 
event_extras_from_aliases.items():
+            for (asset_key, extra_json), event_aliase_names in 
event_extras_from_aliases.items():
+                extra = json.loads(extra_json)
                 ti.log.debug("register event for asset %s with aliases %s", 
asset_key, event_aliase_names)
                 event = asset_manager.register_asset_change(
                     task_instance=ti,
                     asset=asset_key,
                     source_alias_names=event_aliase_names,
-                    extra=dict(extra_key),
+                    extra=extra,
                     session=session,
                 )
                 if event is None:
@@ -1413,7 +1415,7 @@ class TaskInstance(Base, LoggingMixin):
                         task_instance=ti,
                         asset=asset_key,
                         source_alias_names=event_aliase_names,
-                        extra=dict(extra_key),
+                        extra=extra,
                         session=session,
                     )
 
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index f9c93f97601..4f503ef9492 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -209,7 +209,9 @@ export const $AssetEventResponse = {
         extra: {
             anyOf: [
                 {
-                    additionalProperties: true,
+                    additionalProperties: {
+                        '$ref': '#/components/schemas/JsonValue'
+                    },
                     type: 'object'
                 },
                 {
@@ -295,7 +297,9 @@ export const $AssetResponse = {
         extra: {
             anyOf: [
                 {
-                    additionalProperties: true,
+                    additionalProperties: {
+                        '$ref': '#/components/schemas/JsonValue'
+                    },
                     type: 'object'
                 },
                 {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 1396090691c..f2e2c84f812 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -64,7 +64,7 @@ export type AssetEventResponse = {
     name?: string | null;
     group?: string | null;
     extra?: {
-    [key: string]: unknown;
+    [key: string]: JsonValue;
 } | null;
     source_task_id?: string | null;
     source_dag_id?: string | null;
@@ -83,7 +83,7 @@ export type AssetResponse = {
     uri: string;
     group: string;
     extra?: {
-    [key: string]: unknown;
+    [key: string]: JsonValue;
 } | null;
     created_at: string;
     updated_at: string;
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 74260a05a11..4c7a0d9fd5c 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -22,7 +22,7 @@ import datetime
 import operator
 import os
 import pathlib
-from typing import cast
+from typing import TYPE_CHECKING, cast
 from unittest import mock
 from unittest.mock import patch
 
@@ -88,6 +88,11 @@ from tests_common.test_utils.db import clear_db_connections, 
clear_db_runs
 from tests_common.test_utils.mock_operators import MockOperator
 from unit.models import DEFAULT_DATE
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+    from tests_common.pytest_plugin import DagMaker
+
 pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag, 
pytest.mark.want_activate_assets]
 
 
@@ -1602,14 +1607,19 @@ class TestTaskInstance:
         ti.set_duration()
         assert ti.duration is None
 
-    def test_outlet_asset_extra(self, dag_maker, session):
+    def test_outlet_asset_extra(self, dag_maker: DagMaker, session: Session):
         from airflow.sdk.definitions.asset import Asset
 
         with dag_maker(schedule=None, serialized=True, session=session):
 
             @task(outlets=Asset("test_outlet_asset_extra_1"))
-            def write1(*, outlet_events):
-                outlet_events[Asset("test_outlet_asset_extra_1")].extra = 
{"foo": "bar"}
+            def write1(*, outlet_events=None):
+                if TYPE_CHECKING:
+                    assert isinstance(outlet_events, dict)
+                outlet_events[Asset("test_outlet_asset_extra_1")].extra = {
+                    "foo": "bar",
+                    "this": {"is": "nested", "value": 1},
+                }
 
             write1()
 
@@ -1634,7 +1644,7 @@ class TestTaskInstance:
         assert events["write1"].source_run_id == dr.run_id
         assert events["write1"].source_task_id == "write1"
         assert events["write1"].asset.uri == "test_outlet_asset_extra_1"
-        assert events["write1"].extra == {"foo": "bar"}
+        assert events["write1"].extra == {"foo": "bar", "this": {"is": 
"nested", "value": 1}}
 
         assert events["write2"].source_dag_id == dr.dag_id
         assert events["write2"].source_run_id == dr.run_id
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 6dc503ff099..14ca6a5a2cc 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -22,6 +22,7 @@ import math
 import sys
 from collections.abc import Iterator
 from datetime import datetime, timedelta
+from typing import TYPE_CHECKING
 
 import pendulum
 import pytest
@@ -85,6 +86,9 @@ from airflow.utils.types import DagRunType
 
 from unit.models import DEFAULT_DATE
 
+if TYPE_CHECKING:
+    from pydantic.types import JsonValue
+
 DAG_ID = "dag_id_1"
 
 TEST_CALLBACK_PATH = f"{__name__}.empty_callback_for_deadline"
@@ -227,7 +231,7 @@ EmptyOperator(task_id="task1", dag=DAG_WITH_TASKS)
 
 
 def create_outlet_event_accessors(
-    key: Asset | AssetAlias, extra: dict, asset_alias_events: 
list[AssetAliasEvent]
+    key: Asset | AssetAlias, extra: dict[str, JsonValue], asset_alias_events: 
list[AssetAliasEvent]
 ) -> OutletEventAccessors:
     o = OutletEventAccessors()
     o[key].extra = extra
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 9e8f9f7c759..48b96aeaa0c 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -1028,7 +1028,7 @@ class AssetEventResponse(BaseModel):
     uri: Annotated[str | None, Field(title="Uri")] = None
     name: Annotated[str | None, Field(title="Name")] = None
     group: Annotated[str | None, Field(title="Group")] = None
-    extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
+    extra: Annotated[dict[str, JsonValue] | None, Field(title="Extra")] = None
     source_task_id: Annotated[str | None, Field(title="Source Task Id")] = None
     source_dag_id: Annotated[str | None, Field(title="Source Dag Id")] = None
     source_run_id: Annotated[str | None, Field(title="Source Run Id")] = None
@@ -1046,7 +1046,7 @@ class AssetResponse(BaseModel):
     name: Annotated[str, Field(title="Name")]
     uri: Annotated[str, Field(title="Uri")]
     group: Annotated[str, Field(title="Group")]
-    extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
+    extra: Annotated[dict[str, JsonValue] | None, Field(title="Extra")] = None
     created_at: Annotated[datetime, Field(title="Created At")]
     updated_at: Annotated[datetime, Field(title="Updated At")]
     scheduled_dags: Annotated[list[DagScheduleAssetReference], 
Field(title="Scheduled Dags")]
diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py 
b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
index 01df0de6577..1e7391b99d2 100644
--- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py
+++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py
@@ -200,7 +200,7 @@ class TestAssetsOperations:
         id=asset_id,
         name="asset",
         uri="asset_uri",
-        extra={"extra": "extra"},
+        extra={"extra": "extra"},  # type: ignore[dict-item]
         created_at=datetime.datetime(2024, 12, 31, 23, 59, 59),
         updated_at=datetime.datetime(2025, 1, 1, 0, 0, 0),
         scheduled_dags=[],
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index d288fecc1f2..b216c4372e3 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -63,30 +63,6 @@ class AssetProfile(BaseModel):
     type: Annotated[str, Field(title="Type")]
 
 
-class AssetReferenceAssetEventDagRun(BaseModel):
-    """
-    Schema for AssetModel used in AssetEventDagRunReference.
-    """
-
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    name: Annotated[str, Field(title="Name")]
-    uri: Annotated[str, Field(title="Uri")]
-    extra: Annotated[dict[str, Any], Field(title="Extra")]
-
-
-class AssetResponse(BaseModel):
-    """
-    Asset schema for responses with fields that are needed for Runtime.
-    """
-
-    name: Annotated[str, Field(title="Name")]
-    uri: Annotated[str, Field(title="Uri")]
-    group: Annotated[str, Field(title="Group")]
-    extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
-
-
 class ConnectionResponse(BaseModel):
     """
     Connection schema for responses with fields that are needed for Runtime.
@@ -477,6 +453,74 @@ class TriggerRule(str, Enum):
     ALL_SKIPPED = "all_skipped"
 
 
+class AssetReferenceAssetEventDagRun(BaseModel):
+    """
+    Schema for AssetModel used in AssetEventDagRunReference.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    name: Annotated[str, Field(title="Name")]
+    uri: Annotated[str, Field(title="Uri")]
+    extra: Annotated[dict[str, JsonValue], Field(title="Extra")]
+
+
+class AssetResponse(BaseModel):
+    """
+    Asset schema for responses with fields that are needed for Runtime.
+    """
+
+    name: Annotated[str, Field(title="Name")]
+    uri: Annotated[str, Field(title="Uri")]
+    group: Annotated[str, Field(title="Group")]
+    extra: Annotated[dict[str, JsonValue] | None, Field(title="Extra")] = None
+
+
+class HITLDetailRequest(BaseModel):
+    """
+    Schema for the request part of a Human-in-the-loop detail for a specific 
task instance.
+    """
+
+    ti_id: Annotated[UUID, Field(title="Ti Id")]
+    options: Annotated[list[str], Field(min_length=1, title="Options")]
+    subject: Annotated[str, Field(title="Subject")]
+    body: Annotated[str | None, Field(title="Body")] = None
+    defaults: Annotated[list[str] | None, Field(title="Defaults")] = None
+    multiple: Annotated[bool | None, Field(title="Multiple")] = False
+    params: Annotated[dict[str, Any] | None, Field(title="Params")] = None
+    assigned_users: Annotated[list[HITLUser] | None, Field(title="Assigned 
Users")] = None
+
+
+class HITLDetailResponse(BaseModel):
+    """
+    Schema for the response part of a Human-in-the-loop detail for a specific 
task instance.
+    """
+
+    response_received: Annotated[bool, Field(title="Response Received")]
+    responded_by_user: HITLUser | None = None
+    responded_at: Annotated[AwareDatetime | None, Field(title="Responded At")] 
= None
+    chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] 
= None
+    params_input: Annotated[dict[str, Any] | None, Field(title="Params 
Input")] = None
+
+
+class HTTPValidationError(BaseModel):
+    detail: Annotated[list[ValidationError] | None, Field(title="Detail")] = 
None
+
+
+class TITerminalStatePayload(BaseModel):
+    """
+    Schema for updating TaskInstance to a terminal state except SUCCESS state.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    state: TerminalStateNonSuccess
+    end_date: Annotated[AwareDatetime, Field(title="End Date")]
+    rendered_map_index: Annotated[str | None, Field(title="Rendered Map 
Index")] = None
+
+
 class AssetEventDagRunReference(BaseModel):
     """
     Schema for AssetEvent model used in DagRun.
@@ -486,7 +530,7 @@ class AssetEventDagRunReference(BaseModel):
         extra="forbid",
     )
     asset: AssetReferenceAssetEventDagRun
-    extra: Annotated[dict[str, Any], Field(title="Extra")]
+    extra: Annotated[dict[str, JsonValue], Field(title="Extra")]
     source_task_id: Annotated[str | None, Field(title="Source Task Id")] = None
     source_dag_id: Annotated[str | None, Field(title="Source Dag Id")] = None
     source_run_id: Annotated[str | None, Field(title="Source Run Id")] = None
@@ -502,7 +546,7 @@ class AssetEventResponse(BaseModel):
 
     id: Annotated[int, Field(title="Id")]
     timestamp: Annotated[AwareDatetime, Field(title="Timestamp")]
-    extra: Annotated[dict[str, Any] | None, Field(title="Extra")] = None
+    extra: Annotated[dict[str, JsonValue] | None, Field(title="Extra")] = None
     asset: AssetResponse
     created_dagruns: Annotated[list[DagRunAssetReference], 
Field(title="Created Dagruns")]
     source_task_id: Annotated[str | None, Field(title="Source Task Id")] = None
@@ -543,37 +587,6 @@ class DagRun(BaseModel):
     consumed_asset_events: Annotated[list[AssetEventDagRunReference], 
Field(title="Consumed Asset Events")]
 
 
-class HITLDetailRequest(BaseModel):
-    """
-    Schema for the request part of a Human-in-the-loop detail for a specific 
task instance.
-    """
-
-    ti_id: Annotated[UUID, Field(title="Ti Id")]
-    options: Annotated[list[str], Field(min_length=1, title="Options")]
-    subject: Annotated[str, Field(title="Subject")]
-    body: Annotated[str | None, Field(title="Body")] = None
-    defaults: Annotated[list[str] | None, Field(title="Defaults")] = None
-    multiple: Annotated[bool | None, Field(title="Multiple")] = False
-    params: Annotated[dict[str, Any] | None, Field(title="Params")] = None
-    assigned_users: Annotated[list[HITLUser] | None, Field(title="Assigned 
Users")] = None
-
-
-class HITLDetailResponse(BaseModel):
-    """
-    Schema for the response part of a Human-in-the-loop detail for a specific 
task instance.
-    """
-
-    response_received: Annotated[bool, Field(title="Response Received")]
-    responded_by_user: HITLUser | None = None
-    responded_at: Annotated[AwareDatetime | None, Field(title="Responded At")] 
= None
-    chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] 
= None
-    params_input: Annotated[dict[str, Any] | None, Field(title="Params 
Input")] = None
-
-
-class HTTPValidationError(BaseModel):
-    detail: Annotated[list[ValidationError] | None, Field(title="Detail")] = 
None
-
-
 class TIRunContext(BaseModel):
     """
     Response schema for TaskInstance run context.
@@ -591,16 +604,3 @@ class TIRunContext(BaseModel):
     next_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Next 
Kwargs")] = None
     xcom_keys_to_clear: Annotated[list[str] | None, Field(title="Xcom Keys To 
Clear")] = None
     should_retry: Annotated[bool | None, Field(title="Should Retry")] = False
-
-
-class TITerminalStatePayload(BaseModel):
-    """
-    Schema for updating TaskInstance to a terminal state except SUCCESS state.
-    """
-
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    state: TerminalStateNonSuccess
-    end_date: Annotated[AwareDatetime, Field(title="End Date")]
-    rendered_map_index: Annotated[str | None, Field(title="Rendered Map 
Index")] = None
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py 
b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
index 4f986efa6bb..98a9e270c62 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
@@ -35,6 +35,8 @@ if TYPE_CHECKING:
     from collections.abc import Iterable, Iterator
     from urllib.parse import SplitResult
 
+    from pydantic.types import JsonValue
+
     from airflow.models.asset import AssetModel
     from airflow.sdk.io.path import ObjectStoragePath
     from airflow.serialization.serialized_objects import SerializedAssetWatcher
@@ -216,7 +218,7 @@ def _validate_asset_name(instance, attribute, value):
     return value
 
 
-def _set_extra_default(extra: dict | None) -> dict:
+def _set_extra_default(extra: dict[str, JsonValue] | None) -> dict:
     """
     Automatically convert None to an empty dict.
 
@@ -319,7 +321,7 @@ class Asset(os.PathLike, BaseAsset):
         default=attrs.Factory(operator.attrgetter("asset_type"), 
takes_self=True),
         validator=[_validate_identifier],
     )
-    extra: dict[str, Any] = attrs.field(
+    extra: dict[str, JsonValue] = attrs.field(
         factory=dict,
         converter=_set_extra_default,
     )
@@ -337,7 +339,7 @@ class Asset(os.PathLike, BaseAsset):
         uri: str | ObjectStoragePath,
         *,
         group: str = ...,
-        extra: dict | None = None,
+        extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher | SerializedAssetWatcher] = ...,
     ) -> None:
         """Canonical; both name and uri are provided."""
@@ -348,7 +350,7 @@ class Asset(os.PathLike, BaseAsset):
         name: str,
         *,
         group: str = ...,
-        extra: dict | None = None,
+        extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher | SerializedAssetWatcher] = ...,
     ) -> None:
         """It's possible to only provide the name, either by keyword or as the 
only positional argument."""
@@ -359,7 +361,7 @@ class Asset(os.PathLike, BaseAsset):
         *,
         uri: str | ObjectStoragePath,
         group: str = ...,
-        extra: dict | None = None,
+        extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher | SerializedAssetWatcher] = ...,
     ) -> None:
         """It's possible to only provide the URI as a keyword argument."""
@@ -370,7 +372,7 @@ class Asset(os.PathLike, BaseAsset):
         uri: str | ObjectStoragePath | None = None,
         *,
         group: str | None = None,
-        extra: dict | None = None,
+        extra: dict[str, JsonValue] | None = None,
         watchers: list[AssetWatcher | SerializedAssetWatcher] | None = None,
     ) -> None:
         if name is None and uri is None:
@@ -686,4 +688,4 @@ class AssetAliasEvent(attrs.AttrsInstance):
 
     source_alias_name: str
     dest_asset_key: AssetUniqueKey
-    extra: dict[str, Any]
+    extra: dict[str, JsonValue]
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/decorators.py 
b/task-sdk/src/airflow/sdk/definitions/asset/decorators.py
index 44d23efdbba..eab493f5eaa 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/decorators.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/decorators.py
@@ -29,6 +29,8 @@ from airflow.sdk.exceptions import AirflowRuntimeError
 if TYPE_CHECKING:
     from collections.abc import Callable, Collection, Iterator, Mapping
 
+    from pydantic.types import JsonValue
+
     from airflow.sdk import DAG, AssetAlias, ObjectStoragePath
     from airflow.sdk.bases.decorator import _TaskDecorator
     from airflow.sdk.definitions.asset import AssetUniqueKey
@@ -218,7 +220,7 @@ class asset(_DAGFactory):
     name: str | None = None
     uri: str | ObjectStoragePath | None = None
     group: str = Asset.asset_type
-    extra: dict[str, Any] = attrs.field(factory=dict)
+    extra: dict[str, JsonValue] = attrs.field(factory=dict)
     watchers: list[BaseTrigger] = attrs.field(factory=list)
 
     @attrs.define(kw_only=True)
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/metadata.py 
b/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
index 88e88645429..ee8c42b3ad4 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
@@ -17,11 +17,13 @@
 
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
 
 import attrs
 
 if TYPE_CHECKING:
+    from pydantic.types import JsonValue
+
     from airflow.sdk.definitions.asset import Asset, AssetAlias
 
 __all__ = ["Metadata"]
@@ -32,5 +34,5 @@ class Metadata:
     """Metadata to attach to an AssetEvent."""
 
     asset: Asset
-    extra: dict[str, Any] = attrs.field(factory=dict)
+    extra: dict[str, JsonValue] = attrs.field(factory=dict)
     alias: AssetAlias | None = None
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index e4a647bab2f..02b9e90d04a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -44,6 +44,8 @@ from airflow.sdk.log import mask_secret
 if TYPE_CHECKING:
     from uuid import UUID
 
+    from pydantic.types import JsonValue
+
     from airflow.sdk import Variable
     from airflow.sdk.bases.operator import BaseOperator
     from airflow.sdk.definitions.connection import Connection
@@ -477,10 +479,10 @@ class OutletEventAccessor(_AssetRefResolutionMixin):
     """Wrapper to access an outlet asset event in template."""
 
     key: BaseAssetUniqueKey
-    extra: dict[str, Any] = attrs.Factory(dict)
+    extra: dict[str, JsonValue] = attrs.Factory(dict)
     asset_alias_events: list[AssetAliasEvent] = attrs.field(factory=list)
 
-    def add(self, asset: Asset | AssetRef, extra: dict[str, Any] | None = 
None) -> None:
+    def add(self, asset: Asset | AssetRef, extra: dict[str, JsonValue] | None 
= None) -> None:
         """Add an AssetEvent to an existing Asset."""
         if not isinstance(self.key, AssetAliasUniqueKey):
             return
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 71385980271..8352aa7ef5d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -789,7 +789,7 @@ def _build_asset_profiles(lineage_objects: list) -> 
Iterator[AssetProfile]:
             yield AssetProfile(name=obj.name, type=AssetAlias.__name__)
 
 
-def _serialize_outlet_events(events: OutletEventAccessorsProtocol) -> 
Iterator[dict[str, Any]]:
+def _serialize_outlet_events(events: OutletEventAccessorsProtocol) -> 
Iterator[dict[str, JsonValue]]:
     if TYPE_CHECKING:
         assert isinstance(events, OutletEventAccessors)
     # We just collect everything the user recorded in the accessors.
diff --git a/task-sdk/src/airflow/sdk/types.py 
b/task-sdk/src/airflow/sdk/types.py
index 124fb560cc5..c87a7ebd6d1 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -27,7 +27,7 @@ from airflow.sdk.definitions._internal.types import NOTSET, 
ArgNotSet
 if TYPE_CHECKING:
     from collections.abc import Iterator
 
-    from pydantic import AwareDatetime
+    from pydantic import AwareDatetime, JsonValue
 
     from airflow.sdk._shared.logging.types import Logger as Logger
     from airflow.sdk.api.datamodels._generated import TaskInstanceState
@@ -129,17 +129,17 @@ class OutletEventAccessorProtocol(Protocol):
     """Protocol for managing access to a specific outlet event accessor."""
 
     key: BaseAssetUniqueKey
-    extra: dict[str, Any]
+    extra: dict[str, JsonValue]
     asset_alias_events: list[AssetAliasEvent]
 
     def __init__(
         self,
         *,
         key: BaseAssetUniqueKey,
-        extra: dict[str, Any],
+        extra: dict[str, JsonValue],
         asset_alias_events: list[AssetAliasEvent],
     ) -> None: ...
-    def add(self, asset: Asset, extra: dict[str, Any] | None = None) -> None: 
...
+    def add(self, asset: Asset, extra: dict[str, JsonValue] | None = None) -> 
None: ...
 
 
 class OutletEventAccessorsProtocol(Protocol):

Reply via email to