jedcunningham commented on code in PR #31808:
URL: https://github.com/apache/airflow/pull/31808#discussion_r1223635163
##########
airflow/models/trigger.py:
##########
@@ -150,14 +154,31 @@ def submit_event(cls, trigger_id, event, session: Session
= NEW_SESSION) -> None
for task_instance in session.query(TaskInstance).filter(
TaskInstance.trigger_id == trigger_id, TaskInstance.state ==
TaskInstanceState.DEFERRED
):
- # Add the event's payload into the kwargs for the task
- next_kwargs = task_instance.next_kwargs or {}
- next_kwargs["event"] = event.payload
- task_instance.next_kwargs = next_kwargs
- # Remove ourselves as its trigger
- task_instance.trigger_id = None
- # Finally, mark it as scheduled so it gets re-queued
- task_instance.state = TaskInstanceState.SCHEDULED
+ if isinstance(event, BaseTaskEndEvent):
+ # task will be marked with terminal state and will not resume
on worker
+ task_instance.trigger_id = None
+ task_instance.state = event.task_instance_state
+ if event.xcom_return:
+ task_instance.xcom_push(key=XCOM_RETURN_KEY,
value=event.xcom_return)
+ if event.other_xcom:
+ for key, value in event.other_xcom.items():
+ if key == XCOM_RETURN_KEY:
+ log.warning(
+ "Trigger yielded `other_xcom` with reserved
key %s; ignoring. ti=%s",
+ XCOM_RETURN_KEY,
+ task_instance,
+ )
Review Comment:
(There isn't an else though?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]