This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66cd0894563 Add note support to TriggerDagRunOperator (#60810)
66cd0894563 is described below

commit 66cd089456330fdc81abfca673712a34df0ef95d
Author: arnoldmr01 <[email protected]>
AuthorDate: Fri Feb 20 07:40:44 2026 -0500

    Add note support to TriggerDagRunOperator (#60810)
    
    * feat: add note to dagrun init
    
    * feat: add note to DagRunTriggerException
    
    * feat: add note support to TriggerDagRunOperator
    
    * feat: add support to task_runner and supervisor
    
    * feat: add note TriggerDAGRunPayload
    
    # Conflicts:
    #       
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
    #       task-sdk/src/airflow/sdk/api/datamodels/_generated.py
    
    * feat: handle note info
    
    ^ Conflicts:
    ^       airflow-core/src/airflow/api/common/trigger_dag.py
    ^       
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
    
    * feat: add log warning to _trigger_dag_af_2
    
    * feat:add cadwyn migration for TIRunContext
    
    * test: update test
    
    * Fix ci error at Pydantic level instead of SQLA
    
    * Use sa_inspect to avoid lazy load on detached instance
    
    * Respect note association_proxy instead of duplicating same logic
    
    ---------
    
    Co-authored-by: Arnold Lin <[email protected]>
    Co-authored-by: LIU ZHE YOU <[email protected]>
---
 airflow-core/src/airflow/api/common/trigger_dag.py |  4 +++
 .../api_fastapi/execution_api/datamodels/dagrun.py |  1 +
 .../execution_api/datamodels/taskinstance.py       | 35 ++++++++++++++++++++++
 .../api_fastapi/execution_api/routes/dag_runs.py   |  1 +
 .../api_fastapi/execution_api/versions/__init__.py |  3 +-
 .../execution_api/versions/v2026_03_31.py          | 20 ++++++++++++-
 airflow-core/src/airflow/models/dagrun.py          |  2 ++
 .../src/airflow/serialization/definitions/dag.py   |  4 +++
 .../execution_api/versions/head/test_dag_runs.py   |  1 +
 .../versions/head/test_task_instances.py           |  1 +
 airflow-core/tests/unit/models/test_dag.py         | 16 ++++++++++
 airflow-core/tests/unit/models/test_dagrun.py      |  1 +
 .../providers/standard/operators/trigger_dagrun.py | 13 +++++++-
 .../unit/standard/operators/test_trigger_dagrun.py | 14 ++++++---
 task-sdk/src/airflow/sdk/api/client.py             |  5 +++-
 .../src/airflow/sdk/api/datamodels/_generated.py   |  2 ++
 task-sdk/src/airflow/sdk/exceptions.py             |  2 ++
 .../src/airflow/sdk/execution_time/supervisor.py   |  6 +---
 .../src/airflow/sdk/execution_time/task_runner.py  |  1 +
 .../task_sdk/execution_time/test_supervisor.py     | 23 ++++++++++++--
 20 files changed, 140 insertions(+), 15 deletions(-)

diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py 
b/airflow-core/src/airflow/api/common/trigger_dag.py
index 77912f58d26..5b21d4a8c25 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -50,6 +50,7 @@ def _trigger_dag(
     conf: dict | str | None = None,
     logical_date: datetime | None = None,
     replace_microseconds: bool = True,
+    note: str | None = None,
     partition_key: str | None = None,
     session: Session = NEW_SESSION,
 ) -> DagRun | None:
@@ -118,6 +119,7 @@ def _trigger_dag(
         run_type=DagRunType.MANUAL,
         triggered_by=triggered_by,
         triggering_user_name=triggering_user_name,
+        note=note,
         state=DagRunState.QUEUED,
         partition_key=partition_key,
         session=session,
@@ -137,6 +139,7 @@ def trigger_dag(
     conf: dict | str | None = None,
     logical_date: datetime | None = None,
     replace_microseconds: bool = True,
+    note: str | None = None,
     partition_key: str | None = None,
     session: Session = NEW_SESSION,
 ) -> DagRun | None:
@@ -169,6 +172,7 @@ def trigger_dag(
         replace_microseconds=replace_microseconds,
         triggered_by=triggered_by,
         triggering_user_name=triggering_user_name,
+        note=note,
         partition_key=partition_key,
         session=session,
     )
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
index 32b0b7fead0..8619901f717 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
@@ -31,6 +31,7 @@ class TriggerDAGRunPayload(StrictBaseModel):
     conf: dict = Field(default_factory=dict)
     reset_dag_run: bool = False
     partition_key: str | None = None
+    note: str | None = None
 
 
 class DagRunStateResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 513a99f6dc9..1d1c067d4b5 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -29,6 +29,7 @@ from pydantic import (
     Tag,
     TypeAdapter,
     WithJsonSchema,
+    model_validator,
 )
 
 from airflow.api_fastapi.common.types import UtcDateTime
@@ -304,6 +305,40 @@ class DagRun(StrictBaseModel):
     triggering_user_name: str | None = None
     consumed_asset_events: list[AssetEventDagRunReference]
     partition_key: str | None
+    note: str | None = None
+
+    @model_validator(mode="before")
+    @classmethod
+    def extract_dag_run_note(cls, data: Any) -> Any:
+        """Extract the `note` (`str | None` from 
`association_proxy("dag_run_note", "content")`) relationship from `DagRun` to 
prevent `DetachedInstanceError` when constructing `DagRunContext` or 
`TIRunContext` models."""
+        from sqlalchemy import inspect as sa_inspect
+        from sqlalchemy.exc import NoInspectionAvailable
+        from sqlalchemy.orm.state import InstanceState
+
+        if isinstance(data, dict):
+            return data
+
+        # Check if this is a SQLAlchemy model by looking for the inspection 
interface
+        try:
+            insp: InstanceState = sa_inspect(data)
+        except NoInspectionAvailable:
+            # Not a SQLAlchemy object, return as-is for Pydantic to handle
+            return data
+
+        # Check if dag_run_note is already loaded (avoid lazy load on detached 
instance)
+        if "note" in insp.dict:
+            note_value: str | None = insp.dict["note"]
+        else:
+            note_value = None
+
+        # Convert to dict to avoid further lazy loading issues
+        values = {
+            field_name: getattr(data, field_name, None)
+            for field_name in cls.model_fields
+            if field_name != "note"
+        }
+        values["note"] = note_value
+        return values
 
 
 class TIRunContext(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index f0af063b76f..028409c241b 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -123,6 +123,7 @@ def trigger_dag_run(
             triggered_by=DagRunTriggeredByType.OPERATOR,
             replace_microseconds=False,
             partition_key=payload.partition_key,
+            note=payload.note,
             session=session,
         )
     except DagRunAlreadyExists:
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 30d4159f745..c9f4a2f9b66 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -34,13 +34,14 @@ from airflow.api_fastapi.execution_api.versions.v2025_12_08 
import (
     MovePreviousRunEndpoint,
 )
 from airflow.api_fastapi.execution_api.versions.v2026_03_31 import (
+    AddNoteField,
     ModifyDeferredTaskKwargsToJsonValue,
     RemoveUpstreamMapIndexesField,
 )
 
 bundle = VersionBundle(
     HeadVersion(),
-    Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, 
RemoveUpstreamMapIndexesField),
+    Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, 
RemoveUpstreamMapIndexesField, AddNoteField),
     Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint),
     Version("2025-11-07", AddPartitionKeyField),
     Version("2025-11-05", AddTriggeringUserNameField),
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
index 72e193426da..e592296cf31 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
@@ -21,7 +21,11 @@ from typing import Any
 
 from cadwyn import ResponseInfo, VersionChange, 
convert_response_to_previous_version_for, schema
 
-from airflow.api_fastapi.execution_api.datamodels.taskinstance import 
TIDeferredStatePayload, TIRunContext
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
+    DagRun,
+    TIDeferredStatePayload,
+    TIRunContext,
+)
 
 
 class ModifyDeferredTaskKwargsToJsonValue(VersionChange):
@@ -50,3 +54,17 @@ class RemoveUpstreamMapIndexesField(VersionChange):
     def add_upstream_map_indexes_field(response: ResponseInfo) -> None:  # 
type: ignore[misc]
         """Add upstream_map_indexes field with None for older API versions."""
         response.body["upstream_map_indexes"] = None
+
+
+class AddNoteField(VersionChange):
+    """Add note parameter to DagRun Model."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = 
(schema(DagRun).field("note").didnt_exist,)
+
+    @convert_response_to_previous_version_for(TIRunContext)  # type: 
ignore[arg-type]
+    def remove_note_field(response: ResponseInfo) -> None:  # type: 
ignore[misc]
+        """Remove note field for older API versions."""
+        if "dag_run" in response.body and isinstance(response.body["dag_run"], 
dict):
+            response.body["dag_run"].pop("note", None)
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index e05e3c7da3e..d8d88f40ab1 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -329,6 +329,7 @@ class DagRun(Base, LoggingMixin):
         backfill_id: NonNegativeInt | None = None,
         bundle_version: str | None = None,
         partition_key: str | None = None,
+        note: str | None = None,
     ):
         # For manual runs where logical_date is None, ensure no data_interval 
is set.
         if logical_date is None and data_interval is not None:
@@ -349,6 +350,7 @@ class DagRun(Base, LoggingMixin):
             self.run_after = run_after
         self.start_date = start_date
         self.conf = conf or {}
+        self.note = note
         if state is not None:
             self.state = state
         if not is_arg_set(queued_at):
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py 
b/airflow-core/src/airflow/serialization/definitions/dag.py
index b1a2dc8da59..ac83aac52dc 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -500,6 +500,7 @@ class SerializedDAG:
         creating_job_id: int | None = None,
         backfill_id: NonNegativeInt | None = None,
         partition_key: str | None = None,
+        note: str | None = None,
         session: Session = NEW_SESSION,
     ) -> DagRun:
         """
@@ -583,6 +584,7 @@ class SerializedDAG:
             triggered_by=triggered_by,
             triggering_user_name=triggering_user_name,
             partition_key=partition_key,
+            note=note,
             session=session,
         )
 
@@ -1111,6 +1113,7 @@ def _create_orm_dagrun(
     triggered_by: DagRunTriggeredByType,
     triggering_user_name: str | None = None,
     partition_key: str | None = None,
+    note: str | None = None,
     session: Session = NEW_SESSION,
 ) -> DagRun:
     bundle_version = None
@@ -1138,6 +1141,7 @@ def _create_orm_dagrun(
         backfill_id=backfill_id,
         bundle_version=bundle_version,
         partition_key=partition_key,
+        note=note,
     )
     # Load defaults into the following two fields to ensure result can be 
serialized detached
     max_log_template_id = 
session.scalar(select(func.max(LogTemplate.__table__.c.id)))
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 1691ad53531..ebc6a7e5aa3 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -272,6 +272,7 @@ class TestDagRunDetail:
             "start_date": "2023-01-02T00:00:00Z",
             "state": "success",
             "triggering_user_name": None,
+            "note": None,
         }
 
     def test_dag_run_not_found(self, client):
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 2eba8a045a8..ea1153f01cb 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -200,6 +200,7 @@ class TestTIRunState:
                 "triggering_user_name": None,
                 "consumed_asset_events": [],
                 "partition_key": None,
+                "note": None,
             },
             "task_reschedule_count": 0,
             "max_tries": max_tries,
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 5471fdc0f17..7da733ed17f 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1197,6 +1197,22 @@ class TestDag:
         )
         assert dr.creating_job_id == job_id
 
+    def test_create_dagrun_note_is_set(self, testing_dag_bundle):
+        note = "This is a test note"
+        dag = DAG(dag_id="test_create_dagrun_note_is_set", schedule=None)
+        scheduler_dag = sync_dag_to_db(dag)
+        dr = scheduler_dag.create_dagrun(
+            run_id="test_create_dagrun_note_is_set",
+            logical_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            run_after=DEFAULT_DATE,
+            run_type=DagRunType.MANUAL,
+            state=State.NONE,
+            note=note,
+            triggered_by=DagRunTriggeredByType.TEST,
+        )
+        assert dr.note == note
+
     @pytest.mark.parametrize("partition_key", [None, "my-key", 123])
     def test_create_dagrun_partition_key(self, partition_key, dag_maker):
         with dag_maker("test_create_dagrun_partition_key"):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 070e0f62a42..e5bb6e31593 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -687,6 +687,7 @@ class TestDagRun:
         )
 
     def test_dagrun_update_state_with_handle_callback_failure(self, 
testing_dag_bundle, dag_maker, session):
+
         def on_failure_callable(context):
             assert context["dag_run"].dag_id == 
"test_dagrun_update_state_with_handle_callback_failure"
 
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index ae3f978da43..26b990f9c9e 100644
--- 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++ 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import datetime
+import inspect
 import json
 import time
 from collections.abc import Sequence
@@ -179,6 +180,7 @@ class TriggerDagRunOperator(BaseOperator):
         failed_states: list[str | DagRunState] | None = None,
         skip_when_already_exists: bool = False,
         fail_when_dag_is_paused: bool = False,
+        note: str | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         openlineage_inject_parent_info: bool = True,
         **kwargs,
@@ -201,6 +203,7 @@ class TriggerDagRunOperator(BaseOperator):
         self.skip_when_already_exists = skip_when_already_exists
         self.fail_when_dag_is_paused = fail_when_dag_is_paused
         self.openlineage_inject_parent_info = openlineage_inject_parent_info
+        self.note = note
         self.deferrable = deferrable
         self.logical_date = logical_date
         if logical_date is NOTSET:
@@ -274,7 +277,7 @@ class TriggerDagRunOperator(BaseOperator):
     def _trigger_dag_af_3(self, context, run_id, parsed_logical_date):
         from airflow.providers.common.compat.sdk import DagRunTriggerException
 
-        raise DagRunTriggerException(
+        kwargs_accepted = dict(
             trigger_dag_id=self.trigger_dag_id,
             dag_run_id=run_id,
             conf=self.conf,
@@ -288,8 +291,16 @@ class TriggerDagRunOperator(BaseOperator):
             deferrable=self.deferrable,
         )
 
+        if self.note and "note" in 
inspect.signature(DagRunTriggerException.__init__).parameters:
+            kwargs_accepted["note"] = self.note
+
+        raise DagRunTriggerException(**kwargs_accepted)
+
     def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
         try:
+            if self.note:
+                self.log.warning("Parameter 'note' is not supported in Airflow 
2.x and will be ignored.")
+
             dag_run = trigger_dag(
                 dag_id=self.trigger_dag_id,
                 run_id=run_id,
diff --git 
a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py 
b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
index 0f8d1716581..29b4132b235 100644
--- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
+++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
@@ -114,9 +114,7 @@ class TestDagRunOperator:
         """
         with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
             task = TriggerDagRunOperator(
-                task_id="test_task",
-                trigger_dag_id=TRIGGERED_DAG_ID,
-                conf={"foo": "bar"},
+                task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, 
conf={"foo": "bar"}, note="Test note"
             )
 
             # Ensure correct exception is raised
@@ -131,6 +129,8 @@ class TestDagRunOperator:
             assert exc_info.value.wait_for_completion is False
             assert exc_info.value.allowed_states == [DagRunState.SUCCESS]
             assert exc_info.value.failed_states == [DagRunState.FAILED]
+            if getattr(exc_info, "note", None) is not None:
+                assert exc_info.value.note == "Test note"
 
             expected_run_id = DagRun.generate_run_id(
                 run_type=DagRunType.MANUAL, run_after=timezone.utcnow()
@@ -556,13 +556,19 @@ class TestDagRunOperatorAF2:
         """Test TriggerDagRunOperator."""
         with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
             with dag_maker(TEST_DAG_ID, default_args={"start_date": 
DEFAULT_DATE}, serialized=True):
-                task = TriggerDagRunOperator(task_id="test_task", 
trigger_dag_id=TRIGGERED_DAG_ID)
+                task = TriggerDagRunOperator(
+                    task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, 
note="Test note"
+                )
+            mock_warning = mock.patch.object(task.log, "warning").start()
             dag_maker.sync_dagbag_to_db()
             parse_and_sync_to_db(self.f_name)
             dag_maker.create_dagrun()
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
             dagrun = 
dag_maker.session.scalar(select(DagRun).where(DagRun.dag_id == 
TRIGGERED_DAG_ID))
+            assert mock_warning.mock_calls == [
+                mock.call("Parameter 'note' is not supported in Airflow 2.x 
and will be ignored.")
+            ]
             assert dagrun.run_type == DagRunType.MANUAL
             assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
dagrun.logical_date)
 
diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index 3fc023e6c78..f7106bb4aa5 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -695,9 +695,12 @@ class DagRunOperations:
         conf: dict | None = None,
         logical_date: datetime | None = None,
         reset_dag_run: bool = False,
+        note: str | None = None,
     ) -> OKResponse | ErrorResponse:
         """Trigger a Dag run via the API server."""
-        body = TriggerDAGRunPayload(logical_date=logical_date, conf=conf or 
{}, reset_dag_run=reset_dag_run)
+        body = TriggerDAGRunPayload(
+            logical_date=logical_date, conf=conf or {}, 
reset_dag_run=reset_dag_run, note=note
+        )
 
         try:
             self.client.post(
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 32824a48b21..190f833fc68 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -357,6 +357,7 @@ class TriggerDAGRunPayload(BaseModel):
     conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
     reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False
     partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+    note: Annotated[str | None, Field(title="Note")] = None
 
 
 class UpdateHITLDetailPayload(BaseModel):
@@ -628,6 +629,7 @@ class DagRun(BaseModel):
     triggering_user_name: Annotated[str | None, Field(title="Triggering User 
Name")] = None
     consumed_asset_events: Annotated[list[AssetEventDagRunReference], 
Field(title="Consumed Asset Events")]
     partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+    note: Annotated[str | None, Field(title="Note")] = None
 
 
 class TIRunContext(BaseModel):
diff --git a/task-sdk/src/airflow/sdk/exceptions.py 
b/task-sdk/src/airflow/sdk/exceptions.py
index 92185df5ced..9f8a5f11fbf 100644
--- a/task-sdk/src/airflow/sdk/exceptions.py
+++ b/task-sdk/src/airflow/sdk/exceptions.py
@@ -248,6 +248,7 @@ class DagRunTriggerException(AirflowException):
         failed_states: list[str],
         poke_interval: int,
         deferrable: bool,
+        note: str | None = None,
     ):
         super().__init__()
         self.trigger_dag_id = trigger_dag_id
@@ -261,6 +262,7 @@ class DagRunTriggerException(AirflowException):
         self.failed_states = failed_states
         self.poke_interval = poke_interval
         self.deferrable = deferrable
+        self.note = note
 
 
 class DownstreamTasksSkipped(AirflowException):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b87131aa733..b0878f766e2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1371,11 +1371,7 @@ class ActivitySubprocess(WatchedSubprocess):
             dump_opts = {"exclude_unset": True}
         elif isinstance(msg, TriggerDagRun):
             resp = self.client.dag_runs.trigger(
-                msg.dag_id,
-                msg.run_id,
-                msg.conf,
-                msg.logical_date,
-                msg.reset_dag_run,
+                msg.dag_id, msg.run_id, msg.conf, msg.logical_date, 
msg.reset_dag_run, msg.note
             )
         elif isinstance(msg, GetDagRun):
             dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 7650bd56f27..3d641c36fe0 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1368,6 +1368,7 @@ def _handle_trigger_dag_run(
             logical_date=drte.logical_date,
             conf=drte.conf,
             reset_dag_run=drte.reset_dag_run,
+            note=drte.note,
         ),
     )
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 6ce05fefa11..9215f00f26c 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2075,7 +2075,24 @@ REQUEST_TEST_CASES = [
         expected_body={"ok": True, "type": "OKResponse"},
         client_mock=ClientMock(
             method_path="dag_runs.trigger",
-            args=("test_dag", "test_run", {"key": "value"}, 
timezone.datetime(2025, 1, 1), True),
+            args=("test_dag", "test_run", {"key": "value"}, 
timezone.datetime(2025, 1, 1), True, None),
+            response=OKResponse(ok=True),
+        ),
+        test_id="dag_run_trigger",
+    ),
+    RequestTestCase(
+        message=TriggerDagRun(
+            dag_id="test_dag",
+            run_id="test_run",
+            conf={"key": "value"},
+            logical_date=timezone.datetime(2025, 1, 1),
+            reset_dag_run=True,
+            note="Test Note",
+        ),
+        expected_body={"ok": True, "type": "OKResponse"},
+        client_mock=ClientMock(
+            method_path="dag_runs.trigger",
+            args=("test_dag", "test_run", {"key": "value"}, 
timezone.datetime(2025, 1, 1), True, "Test Note"),
             response=OKResponse(ok=True),
         ),
         test_id="dag_run_trigger",
@@ -2085,7 +2102,7 @@ REQUEST_TEST_CASES = [
         expected_body={"error": "DAGRUN_ALREADY_EXISTS", "detail": None, 
"type": "ErrorResponse"},
         client_mock=ClientMock(
             method_path="dag_runs.trigger",
-            args=("test_dag", "test_run", None, None, False),
+            args=("test_dag", "test_run", None, None, False, None),
             response=ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS),
         ),
         test_id="dag_run_trigger_already_exists",
@@ -2109,6 +2126,7 @@ REQUEST_TEST_CASES = [
             "conf": None,
             "triggering_user_name": None,
             "type": "DagRunResult",
+            "note": None,
         },
         client_mock=ClientMock(
             method_path="dag_runs.get_detail",
@@ -2159,6 +2177,7 @@ REQUEST_TEST_CASES = [
                 "clear_number": 0,
                 "conf": None,
                 "triggering_user_name": None,
+                "note": None,
             },
             "type": "PreviousDagRunResult",
         },

Reply via email to