o-nikolas commented on code in PR #42048:
URL: https://github.com/apache/airflow/pull/42048#discussion_r1773758245


##########
airflow/providers/edge/executors/edge_executor.py:
##########
@@ -151,6 +151,18 @@ def cleanup_stuck_queued_tasks(self, tis: 
list[TaskInstance]) -> list[str]:  # p
         """
         raise NotImplementedError()
 
+    def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> 
Sequence[TaskInstance]:
+        """
+        Try to adopt running task instances that have been abandoned by a 
SchedulerJob dying.
+
+        Anything that is not adopted will be cleared by the scheduler (and 
then become eligible for
+        re-scheduling)
+
+        :return: any TaskInstances that were unable to be adopted
+        """
+        # We handle all running tasks from the DB in sync, no adoption logic 
needed.

Review Comment:
   Wait, I'm only now realizing this. So every executor (in a multiple 
scheduler/HA environment) is reading all tasks across the airflow cluster? This 
is another unusual sideeffect of how you've implemented this executor. Usually 
executors maintain a queue of tasks that they've received from _only_ their 
scheduler, but in this case your executor implementation is going to be 
stepping over the toes of every other instance of the edge executor in an HA 
environment.
   
   Is this intended? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to