Lee-W commented on code in PR #59604:
URL: https://github.com/apache/airflow/pull/59604#discussion_r2675034154


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -887,6 +888,22 @@ def are_dependencies_met(
         :param verbose: whether log details on failed dependencies on info or 
debug log level
         """
         dep_context = dep_context or DepContext()
+        if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
+            # This DepContext is used when a task instance is in 
UP_FOR_RESCHEDULE state.
+            #
+            # Tasks can be put into UP_FOR_RESCHEDULE by the task runner 
itself (e.g. when
+            # the worker cannot load the Dag or task). In this case, the 
scheduler must respect
+            # the task instance's reschedule_date before scheduling it again.
+            #
+            # ReadyToRescheduleDep is the only dependency that enforces this 
time-based gating.
+            # We therefore extend the normal scheduling dependency set with 
it, instead of
+            # modifying the global scheduler dependencies.
+            dep_context = DepContext(
+                deps=dep_context.deps | {ReadyToRescheduleDep()},
+                flag_upstream_failed=dep_context.flag_upstream_failed,
+                ignore_unmapped_tasks=dep_context.ignore_unmapped_tasks,
+                finished_tis=dep_context.finished_tis,
+            )

Review Comment:
   Yes, this works fine. I tried to do it another way and failed... just 
updated it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to