anishgirianish commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r3025670031


##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -305,10 +315,24 @@ def heartbeat(self) -> None:
         self._emit_metrics(open_slots, num_running_workloads, 
num_queued_workloads)
         self.trigger_tasks(open_slots)
 
+        self.trigger_connection_tests()
+
         # Calling child class sync method
         self.log.debug("Calling the %s sync method", self.__class__)
         self.sync()
 
+    def trigger_connection_tests(self) -> None:
+        """Process queued connection tests, respecting available slot 
capacity."""
+        if not self.supports_connection_test or not 
self.queued_connection_tests:
+            return
+
+        available = self.slots_available
+        if available <= 0:
+            return
+
+        tests_to_run = list(self.queued_connection_tests.values())[:available]
+        self._process_workloads(tests_to_run)

Review Comment:
   This is not a deadlock. slots_available is the capacity model — it correctly 
accounts for all queued work (tasks, callbacks, connection tests). When slots 
are full, the function returns early, which is the intended behavior. 
Connection tests get processed when slots free up after tasks complete. The 
dispatch method in the scheduler (_dispatch_connection_tests) computes its own 
budget from parallelism - len(running) and picks up PENDING rows independently.



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -305,10 +315,24 @@ def heartbeat(self) -> None:
         self._emit_metrics(open_slots, num_running_workloads, 
num_queued_workloads)
         self.trigger_tasks(open_slots)
 
+        self.trigger_connection_tests()
+
         # Calling child class sync method
         self.log.debug("Calling the %s sync method", self.__class__)
         self.sync()
 
+    def trigger_connection_tests(self) -> None:
+        """Process queued connection tests, respecting available slot 
capacity."""
+        if not self.supports_connection_test or not 
self.queued_connection_tests:
+            return
+
+        available = self.slots_available
+        if available <= 0:
+            return
+
+        tests_to_run = list(self.queued_connection_tests.values())[:available]
+        self._process_workloads(tests_to_run)

Review Comment:
   This is not a deadlock. slots_available is the capacity model it correctly 
accounts for all queued work (tasks, callbacks, connection tests). When slots 
are full, the function returns early, which is the intended behavior. 
Connection tests get processed when slots free up after tasks complete. The 
dispatch method in the scheduler (_dispatch_connection_tests) computes its own 
budget from parallelism - len(running) and picks up PENDING rows independently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to