Copilot commented on code in PR #64068:
URL: https://github.com/apache/airflow/pull/64068#discussion_r3025338559
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -273,6 +273,12 @@ def submit_event(cls, trigger_id, event: TriggerEvent,
session: Session = NEW_SE
handle_event_submit(event, task_instance=task_instance,
session=session)
# Send an event to assets
+ if event.xcoms:
+ log.warning(
+ "Trigger event %i contains XCom values, which cannot be sent
to assets or callbacks. XCom values: %s",
+ trigger_id,
+ event.xcoms,
Review Comment:
The warning logs the full `event.xcoms` payload. Since XComs can contain
sensitive or large data, this can leak secrets into logs and create noisy log
volume. Consider logging only the presence/keys (or a count), and only emitting
this warning when the trigger actually has asset/callback associations to
notify (otherwise this will warn even for task-only triggers).
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -486,6 +495,11 @@ def handle_event_submit(event: TriggerEvent, *,
task_instance: TaskInstance, ses
# re-serialize the entire dict using serde to ensure consistent structure
task_instance.next_kwargs = serialize(next_kwargs)
+ # Push XCom values if provided by the trigger
+ if event.xcoms:
+ for key, value in event.xcoms.items():
+ task_instance.xcom_push(key=key, value=value, session=session)
+
Review Comment:
Pushing XComs one-by-one can be expensive: `task_instance.xcom_push()` calls
`XComModel.set()` which performs a `DagRun.id` lookup plus delete/insert for
each key. If `event.xcoms` can contain multiple keys, consider batching this
(e.g., resolve `dag_run_id` once and insert multiple rows in one operation) to
avoid repeated queries and write amplification.
##########
airflow-core/docs/authoring-and-scheduling/deferring.rst:
##########
@@ -445,12 +445,14 @@ Triggers can have two options: they can either send
execution back to the worker
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
- yield TaskSuccessEvent()
+ yield TaskSuccessEvent(xcoms={"wait_duration_s":
self.duration.total_seconds()})
else:
yield TriggerEvent({"duration": self.duration})
In the above example, the trigger will end the task instance directly if
``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``.
Otherwise, it will resume the task instance with the method specified in the
operator.
+Note also that in case of direct exit, an XCom can be produced and passed with
the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when
the task instance is marked as success or failure.
Review Comment:
The docs refer to `TaskFailureEvent`, but the actual event class is
`TaskFailedEvent` (see `airflow/triggers/base.py`). This mismatch can confuse
users copying the example; please update the name here (and elsewhere in this
section if applicable).
```suggestion
Note also that in case of direct exit, an XCom can be produced and passed
with the ``TaskSuccessEvent`` or ``TaskFailedEvent``. This XCom will be pushed
when the task instance is marked as success or failure.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]