uranusjr commented on code in PR #59604:
URL: https://github.com/apache/airflow/pull/59604#discussion_r2674623994
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -887,6 +888,22 @@ def are_dependencies_met(
:param verbose: whether log details on failed dependencies on info or
debug log level
"""
dep_context = dep_context or DepContext()
+ if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
+ # This DepContext is used when a task instance is in
UP_FOR_RESCHEDULE state.
+ #
+ # Tasks can be put into UP_FOR_RESCHEDULE by the task runner
itself (e.g. when
+ # the worker cannot load the Dag or task). In this case, the
scheduler must respect
+ # the task instance's reschedule_date before scheduling it again.
+ #
+ # ReadyToRescheduleDep is the only dependency that enforces this
time-based gating.
+ # We therefore extend the normal scheduling dependency set with
it, instead of
+ # modifying the global scheduler dependencies.
+ dep_context = DepContext(
+ deps=dep_context.deps | {ReadyToRescheduleDep()},
+ flag_upstream_failed=dep_context.flag_upstream_failed,
+ ignore_unmapped_tasks=dep_context.ignore_unmapped_tasks,
+ finished_tis=dep_context.finished_tis,
+ )
Review Comment:
Can this not modify `dep_context` in-place?
```python
dep_context.deps.add(ReadyToRescheduleDep())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]