dabla commented on code in PR #55068:
URL: https://github.com/apache/airflow/pull/55068#discussion_r2802760752
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1429,6 +1429,123 @@ def update_heartbeat(self):
.values(last_heartbeat_at=timezone.utcnow())
)
+ @property
+ def start_trigger_args(self) -> StartTriggerArgs | None:
+ if self.task and self.task.start_from_trigger is True:
+ return self.task.start_trigger_args
+ return None
+
+ # TODO: We have some code duplication here and in the
_create_ti_state_update_query_and_update_state
+ # method of the task_instances module in the execution api when a
TIDeferredStatePayload is being
+ # processed. This is because of a TaskInstance being updated
differently using SQLAlchemy.
+ # If we use the approach from the execution api as common code in
the DagRun schedule_tis method,
+ # the side effect is the changes done to the task instance aren't
picked up by the scheduler and
+ # thus the task instance isn't processed until the scheduler is
restarted.
+ @provide_session
+ def defer_task(self, session: Session = NEW_SESSION) -> bool:
Review Comment:
The defer_Task is being invoked in the
[schedule_tis](https://github.com/apache/airflow/pull/55068/changes#diff-aa0338f81a481ca6bc69703521c531d593558d4347a958dc00e5f9689d6802a1R2066)
method of the DagRun class, this is what makes the direct execution on the
trigger possible with start_from_trigger.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]