o-nikolas commented on code in PR #42048:
URL: https://github.com/apache/airflow/pull/42048#discussion_r1773758245
##########
airflow/providers/edge/executors/edge_executor.py:
##########
@@ -151,6 +151,18 @@ def cleanup_stuck_queued_tasks(self, tis:
list[TaskInstance]) -> list[str]: # p
"""
raise NotImplementedError()
+ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) ->
Sequence[TaskInstance]:
+ """
+ Try to adopt running task instances that have been abandoned by a
SchedulerJob dying.
+
+ Anything that is not adopted will be cleared by the scheduler (and
then become eligible for
+ re-scheduling)
+
+ :return: any TaskInstances that were unable to be adopted
+ """
+ # We handle all running tasks from the DB in sync, no adoption logic
needed.
Review Comment:
Wait, I'm only now realizing this. So every executor (in a multiple
scheduler/HA environment) is reading all tasks across the airflow cluster? This
is another unusual sideeffect of how you've implemented this executor. Usually
executors maintain a queue of tasks that they've received from _only_ their
scheduler, but in this case your executor implementation is going to be
stepping over the toes of every other instance of the edge executor in an HA
environment.
Is this intended?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]