This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66cd0894563 Add note support to TriggerDagRunOperator (#60810)
66cd0894563 is described below
commit 66cd089456330fdc81abfca673712a34df0ef95d
Author: arnoldmr01 <[email protected]>
AuthorDate: Fri Feb 20 07:40:44 2026 -0500
Add note support to TriggerDagRunOperator (#60810)
* feat: add note to dagrun init
* feat: add note to DagRunTriggerException
* feat: add note support to TriggerDagRunOperator
* feat: add support to task_runner and supervisor
* feat: add note TriggerDAGRunPayload
# Conflicts:
#
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
# task-sdk/src/airflow/sdk/api/datamodels/_generated.py
* feat: handle note info
^ Conflicts:
^ airflow-core/src/airflow/api/common/trigger_dag.py
^
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
* feat: add log warning to _trigger_dag_af_2
* feat:add cadwyn migration for TIRunContext
* test: update test
* Fix ci error at Pydantic level instead of SQLA
* Use sa_inspect to avoid lazy load on detached instance
* Respect note association_proxy instead of duplicating same logic
---------
Co-authored-by: Arnold Lin <[email protected]>
Co-authored-by: LIU ZHE YOU <[email protected]>
---
airflow-core/src/airflow/api/common/trigger_dag.py | 4 +++
.../api_fastapi/execution_api/datamodels/dagrun.py | 1 +
.../execution_api/datamodels/taskinstance.py | 35 ++++++++++++++++++++++
.../api_fastapi/execution_api/routes/dag_runs.py | 1 +
.../api_fastapi/execution_api/versions/__init__.py | 3 +-
.../execution_api/versions/v2026_03_31.py | 20 ++++++++++++-
airflow-core/src/airflow/models/dagrun.py | 2 ++
.../src/airflow/serialization/definitions/dag.py | 4 +++
.../execution_api/versions/head/test_dag_runs.py | 1 +
.../versions/head/test_task_instances.py | 1 +
airflow-core/tests/unit/models/test_dag.py | 16 ++++++++++
airflow-core/tests/unit/models/test_dagrun.py | 1 +
.../providers/standard/operators/trigger_dagrun.py | 13 +++++++-
.../unit/standard/operators/test_trigger_dagrun.py | 14 ++++++---
task-sdk/src/airflow/sdk/api/client.py | 5 +++-
.../src/airflow/sdk/api/datamodels/_generated.py | 2 ++
task-sdk/src/airflow/sdk/exceptions.py | 2 ++
.../src/airflow/sdk/execution_time/supervisor.py | 6 +---
.../src/airflow/sdk/execution_time/task_runner.py | 1 +
.../task_sdk/execution_time/test_supervisor.py | 23 ++++++++++++--
20 files changed, 140 insertions(+), 15 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/trigger_dag.py
b/airflow-core/src/airflow/api/common/trigger_dag.py
index 77912f58d26..5b21d4a8c25 100644
--- a/airflow-core/src/airflow/api/common/trigger_dag.py
+++ b/airflow-core/src/airflow/api/common/trigger_dag.py
@@ -50,6 +50,7 @@ def _trigger_dag(
conf: dict | str | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
+ note: str | None = None,
partition_key: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun | None:
@@ -118,6 +119,7 @@ def _trigger_dag(
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
+ note=note,
state=DagRunState.QUEUED,
partition_key=partition_key,
session=session,
@@ -137,6 +139,7 @@ def trigger_dag(
conf: dict | str | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
+ note: str | None = None,
partition_key: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun | None:
@@ -169,6 +172,7 @@ def trigger_dag(
replace_microseconds=replace_microseconds,
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
+ note=note,
partition_key=partition_key,
session=session,
)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
index 32b0b7fead0..8619901f717 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py
@@ -31,6 +31,7 @@ class TriggerDAGRunPayload(StrictBaseModel):
conf: dict = Field(default_factory=dict)
reset_dag_run: bool = False
partition_key: str | None = None
+ note: str | None = None
class DagRunStateResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 513a99f6dc9..1d1c067d4b5 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -29,6 +29,7 @@ from pydantic import (
Tag,
TypeAdapter,
WithJsonSchema,
+ model_validator,
)
from airflow.api_fastapi.common.types import UtcDateTime
@@ -304,6 +305,40 @@ class DagRun(StrictBaseModel):
triggering_user_name: str | None = None
consumed_asset_events: list[AssetEventDagRunReference]
partition_key: str | None
+ note: str | None = None
+
+ @model_validator(mode="before")
+ @classmethod
+ def extract_dag_run_note(cls, data: Any) -> Any:
+ """Extract the `note` (`str | None` from
`association_proxy("dag_run_note", "content")`) relationship from `DagRun` to
prevent `DetachedInstanceError` when constructing `DagRunContext` or
`TIRunContext` models."""
+ from sqlalchemy import inspect as sa_inspect
+ from sqlalchemy.exc import NoInspectionAvailable
+ from sqlalchemy.orm.state import InstanceState
+
+ if isinstance(data, dict):
+ return data
+
+ # Check if this is a SQLAlchemy model by looking for the inspection
interface
+ try:
+ insp: InstanceState = sa_inspect(data)
+ except NoInspectionAvailable:
+ # Not a SQLAlchemy object, return as-is for Pydantic to handle
+ return data
+
+ # Check if dag_run_note is already loaded (avoid lazy load on detached
instance)
+ if "note" in insp.dict:
+ note_value: str | None = insp.dict["note"]
+ else:
+ note_value = None
+
+ # Convert to dict to avoid further lazy loading issues
+ values = {
+ field_name: getattr(data, field_name, None)
+ for field_name in cls.model_fields
+ if field_name != "note"
+ }
+ values["note"] = note_value
+ return values
class TIRunContext(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index f0af063b76f..028409c241b 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -123,6 +123,7 @@ def trigger_dag_run(
triggered_by=DagRunTriggeredByType.OPERATOR,
replace_microseconds=False,
partition_key=payload.partition_key,
+ note=payload.note,
session=session,
)
except DagRunAlreadyExists:
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 30d4159f745..c9f4a2f9b66 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -34,13 +34,14 @@ from airflow.api_fastapi.execution_api.versions.v2025_12_08
import (
MovePreviousRunEndpoint,
)
from airflow.api_fastapi.execution_api.versions.v2026_03_31 import (
+ AddNoteField,
ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField,
)
bundle = VersionBundle(
HeadVersion(),
- Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField),
+ Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField, AddNoteField),
Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint),
Version("2025-11-07", AddPartitionKeyField),
Version("2025-11-05", AddTriggeringUserNameField),
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
index 72e193426da..e592296cf31 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
@@ -21,7 +21,11 @@ from typing import Any
from cadwyn import ResponseInfo, VersionChange,
convert_response_to_previous_version_for, schema
-from airflow.api_fastapi.execution_api.datamodels.taskinstance import
TIDeferredStatePayload, TIRunContext
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
+ DagRun,
+ TIDeferredStatePayload,
+ TIRunContext,
+)
class ModifyDeferredTaskKwargsToJsonValue(VersionChange):
@@ -50,3 +54,17 @@ class RemoveUpstreamMapIndexesField(VersionChange):
def add_upstream_map_indexes_field(response: ResponseInfo) -> None: #
type: ignore[misc]
"""Add upstream_map_indexes field with None for older API versions."""
response.body["upstream_map_indexes"] = None
+
+
+class AddNoteField(VersionChange):
+ """Add note parameter to DagRun Model."""
+
+ description = __doc__
+
+ instructions_to_migrate_to_previous_version =
(schema(DagRun).field("note").didnt_exist,)
+
+ @convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
+ def remove_note_field(response: ResponseInfo) -> None: # type:
ignore[misc]
+ """Remove note field for older API versions."""
+ if "dag_run" in response.body and isinstance(response.body["dag_run"],
dict):
+ response.body["dag_run"].pop("note", None)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index e05e3c7da3e..d8d88f40ab1 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -329,6 +329,7 @@ class DagRun(Base, LoggingMixin):
backfill_id: NonNegativeInt | None = None,
bundle_version: str | None = None,
partition_key: str | None = None,
+ note: str | None = None,
):
# For manual runs where logical_date is None, ensure no data_interval
is set.
if logical_date is None and data_interval is not None:
@@ -349,6 +350,7 @@ class DagRun(Base, LoggingMixin):
self.run_after = run_after
self.start_date = start_date
self.conf = conf or {}
+ self.note = note
if state is not None:
self.state = state
if not is_arg_set(queued_at):
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index b1a2dc8da59..ac83aac52dc 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -500,6 +500,7 @@ class SerializedDAG:
creating_job_id: int | None = None,
backfill_id: NonNegativeInt | None = None,
partition_key: str | None = None,
+ note: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
"""
@@ -583,6 +584,7 @@ class SerializedDAG:
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
partition_key=partition_key,
+ note=note,
session=session,
)
@@ -1111,6 +1113,7 @@ def _create_orm_dagrun(
triggered_by: DagRunTriggeredByType,
triggering_user_name: str | None = None,
partition_key: str | None = None,
+ note: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
bundle_version = None
@@ -1138,6 +1141,7 @@ def _create_orm_dagrun(
backfill_id=backfill_id,
bundle_version=bundle_version,
partition_key=partition_key,
+ note=note,
)
# Load defaults into the following two fields to ensure result can be
serialized detached
max_log_template_id =
session.scalar(select(func.max(LogTemplate.__table__.c.id)))
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 1691ad53531..ebc6a7e5aa3 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -272,6 +272,7 @@ class TestDagRunDetail:
"start_date": "2023-01-02T00:00:00Z",
"state": "success",
"triggering_user_name": None,
+ "note": None,
}
def test_dag_run_not_found(self, client):
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 2eba8a045a8..ea1153f01cb 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -200,6 +200,7 @@ class TestTIRunState:
"triggering_user_name": None,
"consumed_asset_events": [],
"partition_key": None,
+ "note": None,
},
"task_reschedule_count": 0,
"max_tries": max_tries,
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 5471fdc0f17..7da733ed17f 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1197,6 +1197,22 @@ class TestDag:
)
assert dr.creating_job_id == job_id
+ def test_create_dagrun_note_is_set(self, testing_dag_bundle):
+ note = "This is a test note"
+ dag = DAG(dag_id="test_create_dagrun_note_is_set", schedule=None)
+ scheduler_dag = sync_dag_to_db(dag)
+ dr = scheduler_dag.create_dagrun(
+ run_id="test_create_dagrun_note_is_set",
+ logical_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ run_after=DEFAULT_DATE,
+ run_type=DagRunType.MANUAL,
+ state=State.NONE,
+ note=note,
+ triggered_by=DagRunTriggeredByType.TEST,
+ )
+ assert dr.note == note
+
@pytest.mark.parametrize("partition_key", [None, "my-key", 123])
def test_create_dagrun_partition_key(self, partition_key, dag_maker):
with dag_maker("test_create_dagrun_partition_key"):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 070e0f62a42..e5bb6e31593 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -687,6 +687,7 @@ class TestDagRun:
)
def test_dagrun_update_state_with_handle_callback_failure(self,
testing_dag_bundle, dag_maker, session):
+
def on_failure_callable(context):
assert context["dag_run"].dag_id ==
"test_dagrun_update_state_with_handle_callback_failure"
diff --git
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index ae3f978da43..26b990f9c9e 100644
---
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime
+import inspect
import json
import time
from collections.abc import Sequence
@@ -179,6 +180,7 @@ class TriggerDagRunOperator(BaseOperator):
failed_states: list[str | DagRunState] | None = None,
skip_when_already_exists: bool = False,
fail_when_dag_is_paused: bool = False,
+ note: str | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
openlineage_inject_parent_info: bool = True,
**kwargs,
@@ -201,6 +203,7 @@ class TriggerDagRunOperator(BaseOperator):
self.skip_when_already_exists = skip_when_already_exists
self.fail_when_dag_is_paused = fail_when_dag_is_paused
self.openlineage_inject_parent_info = openlineage_inject_parent_info
+ self.note = note
self.deferrable = deferrable
self.logical_date = logical_date
if logical_date is NOTSET:
@@ -274,7 +277,7 @@ class TriggerDagRunOperator(BaseOperator):
def _trigger_dag_af_3(self, context, run_id, parsed_logical_date):
from airflow.providers.common.compat.sdk import DagRunTriggerException
- raise DagRunTriggerException(
+ kwargs_accepted = dict(
trigger_dag_id=self.trigger_dag_id,
dag_run_id=run_id,
conf=self.conf,
@@ -288,8 +291,16 @@ class TriggerDagRunOperator(BaseOperator):
deferrable=self.deferrable,
)
+ if self.note and "note" in
inspect.signature(DagRunTriggerException.__init__).parameters:
+ kwargs_accepted["note"] = self.note
+
+ raise DagRunTriggerException(**kwargs_accepted)
+
def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
try:
+ if self.note:
+ self.log.warning("Parameter 'note' is not supported in Airflow
2.x and will be ignored.")
+
dag_run = trigger_dag(
dag_id=self.trigger_dag_id,
run_id=run_id,
diff --git
a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
index 0f8d1716581..29b4132b235 100644
--- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
+++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
@@ -114,9 +114,7 @@ class TestDagRunOperator:
"""
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
task = TriggerDagRunOperator(
- task_id="test_task",
- trigger_dag_id=TRIGGERED_DAG_ID,
- conf={"foo": "bar"},
+ task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID,
conf={"foo": "bar"}, note="Test note"
)
# Ensure correct exception is raised
@@ -131,6 +129,8 @@ class TestDagRunOperator:
assert exc_info.value.wait_for_completion is False
assert exc_info.value.allowed_states == [DagRunState.SUCCESS]
assert exc_info.value.failed_states == [DagRunState.FAILED]
+ if getattr(exc_info, "note", None) is not None:
+ assert exc_info.value.note == "Test note"
expected_run_id = DagRun.generate_run_id(
run_type=DagRunType.MANUAL, run_after=timezone.utcnow()
@@ -556,13 +556,19 @@ class TestDagRunOperatorAF2:
"""Test TriggerDagRunOperator."""
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
with dag_maker(TEST_DAG_ID, default_args={"start_date":
DEFAULT_DATE}, serialized=True):
- task = TriggerDagRunOperator(task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID)
+ task = TriggerDagRunOperator(
+ task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID,
note="Test note"
+ )
+ mock_warning = mock.patch.object(task.log, "warning").start()
dag_maker.sync_dagbag_to_db()
parse_and_sync_to_db(self.f_name)
dag_maker.create_dagrun()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
dagrun =
dag_maker.session.scalar(select(DagRun).where(DagRun.dag_id ==
TRIGGERED_DAG_ID))
+ assert mock_warning.mock_calls == [
+ mock.call("Parameter 'note' is not supported in Airflow 2.x
and will be ignored.")
+ ]
assert dagrun.run_type == DagRunType.MANUAL
assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL,
dagrun.logical_date)
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 3fc023e6c78..f7106bb4aa5 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -695,9 +695,12 @@ class DagRunOperations:
conf: dict | None = None,
logical_date: datetime | None = None,
reset_dag_run: bool = False,
+ note: str | None = None,
) -> OKResponse | ErrorResponse:
"""Trigger a Dag run via the API server."""
- body = TriggerDAGRunPayload(logical_date=logical_date, conf=conf or
{}, reset_dag_run=reset_dag_run)
+ body = TriggerDAGRunPayload(
+ logical_date=logical_date, conf=conf or {},
reset_dag_run=reset_dag_run, note=note
+ )
try:
self.client.post(
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 32824a48b21..190f833fc68 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -357,6 +357,7 @@ class TriggerDAGRunPayload(BaseModel):
conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+ note: Annotated[str | None, Field(title="Note")] = None
class UpdateHITLDetailPayload(BaseModel):
@@ -628,6 +629,7 @@ class DagRun(BaseModel):
triggering_user_name: Annotated[str | None, Field(title="Triggering User
Name")] = None
consumed_asset_events: Annotated[list[AssetEventDagRunReference],
Field(title="Consumed Asset Events")]
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+ note: Annotated[str | None, Field(title="Note")] = None
class TIRunContext(BaseModel):
diff --git a/task-sdk/src/airflow/sdk/exceptions.py
b/task-sdk/src/airflow/sdk/exceptions.py
index 92185df5ced..9f8a5f11fbf 100644
--- a/task-sdk/src/airflow/sdk/exceptions.py
+++ b/task-sdk/src/airflow/sdk/exceptions.py
@@ -248,6 +248,7 @@ class DagRunTriggerException(AirflowException):
failed_states: list[str],
poke_interval: int,
deferrable: bool,
+ note: str | None = None,
):
super().__init__()
self.trigger_dag_id = trigger_dag_id
@@ -261,6 +262,7 @@ class DagRunTriggerException(AirflowException):
self.failed_states = failed_states
self.poke_interval = poke_interval
self.deferrable = deferrable
+ self.note = note
class DownstreamTasksSkipped(AirflowException):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b87131aa733..b0878f766e2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1371,11 +1371,7 @@ class ActivitySubprocess(WatchedSubprocess):
dump_opts = {"exclude_unset": True}
elif isinstance(msg, TriggerDagRun):
resp = self.client.dag_runs.trigger(
- msg.dag_id,
- msg.run_id,
- msg.conf,
- msg.logical_date,
- msg.reset_dag_run,
+ msg.dag_id, msg.run_id, msg.conf, msg.logical_date,
msg.reset_dag_run, msg.note
)
elif isinstance(msg, GetDagRun):
dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 7650bd56f27..3d641c36fe0 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1368,6 +1368,7 @@ def _handle_trigger_dag_run(
logical_date=drte.logical_date,
conf=drte.conf,
reset_dag_run=drte.reset_dag_run,
+ note=drte.note,
),
)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 6ce05fefa11..9215f00f26c 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2075,7 +2075,24 @@ REQUEST_TEST_CASES = [
expected_body={"ok": True, "type": "OKResponse"},
client_mock=ClientMock(
method_path="dag_runs.trigger",
- args=("test_dag", "test_run", {"key": "value"},
timezone.datetime(2025, 1, 1), True),
+ args=("test_dag", "test_run", {"key": "value"},
timezone.datetime(2025, 1, 1), True, None),
+ response=OKResponse(ok=True),
+ ),
+ test_id="dag_run_trigger",
+ ),
+ RequestTestCase(
+ message=TriggerDagRun(
+ dag_id="test_dag",
+ run_id="test_run",
+ conf={"key": "value"},
+ logical_date=timezone.datetime(2025, 1, 1),
+ reset_dag_run=True,
+ note="Test Note",
+ ),
+ expected_body={"ok": True, "type": "OKResponse"},
+ client_mock=ClientMock(
+ method_path="dag_runs.trigger",
+ args=("test_dag", "test_run", {"key": "value"},
timezone.datetime(2025, 1, 1), True, "Test Note"),
response=OKResponse(ok=True),
),
test_id="dag_run_trigger",
@@ -2085,7 +2102,7 @@ REQUEST_TEST_CASES = [
expected_body={"error": "DAGRUN_ALREADY_EXISTS", "detail": None,
"type": "ErrorResponse"},
client_mock=ClientMock(
method_path="dag_runs.trigger",
- args=("test_dag", "test_run", None, None, False),
+ args=("test_dag", "test_run", None, None, False, None),
response=ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS),
),
test_id="dag_run_trigger_already_exists",
@@ -2109,6 +2126,7 @@ REQUEST_TEST_CASES = [
"conf": None,
"triggering_user_name": None,
"type": "DagRunResult",
+ "note": None,
},
client_mock=ClientMock(
method_path="dag_runs.get_detail",
@@ -2159,6 +2177,7 @@ REQUEST_TEST_CASES = [
"clear_number": 0,
"conf": None,
"triggering_user_name": None,
+ "note": None,
},
"type": "PreviousDagRunResult",
},