anishgirianish commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r3025670031
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -305,10 +315,24 @@ def heartbeat(self) -> None:
self._emit_metrics(open_slots, num_running_workloads,
num_queued_workloads)
self.trigger_tasks(open_slots)
+ self.trigger_connection_tests()
+
# Calling child class sync method
self.log.debug("Calling the %s sync method", self.__class__)
self.sync()
+ def trigger_connection_tests(self) -> None:
+ """Process queued connection tests, respecting available slot
capacity."""
+ if not self.supports_connection_test or not
self.queued_connection_tests:
+ return
+
+ available = self.slots_available
+ if available <= 0:
+ return
+
+ tests_to_run = list(self.queued_connection_tests.values())[:available]
+ self._process_workloads(tests_to_run)
Review Comment:
This is not a deadlock. slots_available is the capacity model — it correctly
accounts for all queued work (tasks, callbacks, connection tests). When slots
are full, the function returns early, which is the intended behavior.
Connection tests get processed when slots free up after tasks complete. The
dispatch method in the scheduler (_dispatch_connection_tests) computes its own
budget from parallelism - len(running) and picks up PENDING rows independently.
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -305,10 +315,24 @@ def heartbeat(self) -> None:
self._emit_metrics(open_slots, num_running_workloads,
num_queued_workloads)
self.trigger_tasks(open_slots)
+ self.trigger_connection_tests()
+
# Calling child class sync method
self.log.debug("Calling the %s sync method", self.__class__)
self.sync()
+ def trigger_connection_tests(self) -> None:
+ """Process queued connection tests, respecting available slot
capacity."""
+ if not self.supports_connection_test or not
self.queued_connection_tests:
+ return
+
+ available = self.slots_available
+ if available <= 0:
+ return
+
+ tests_to_run = list(self.queued_connection_tests.values())[:available]
+ self._process_workloads(tests_to_run)
Review Comment:
This is not a deadlock. slots_available is the capacity model it correctly
accounts for all queued work (tasks, callbacks, connection tests). When slots
are full, the function returns early, which is the intended behavior.
Connection tests get processed when slots free up after tasks complete. The
dispatch method in the scheduler (_dispatch_connection_tests) computes its own
budget from parallelism - len(running) and picks up PENDING rows independently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]