ferruzzi commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r2893084458


##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -240,10 +242,18 @@ def queue_workload(self, workload: workloads.All, 
session: Session) -> None:
                     f"See LocalExecutor or CeleryExecutor for reference 
implementation."
                 )
             self.queued_callbacks[workload.callback.id] = workload
+        elif isinstance(workload, workloads.TestConnection):
+            if not self.supports_connection_test:
+                raise NotImplementedError(
+                    f"{type(self).__name__} does not support TestConnection 
workloads. "
+                    f"Set supports_connection_test = True and implement 
connection test handling "
+                    f"in _process_workloads(). See LocalExecutor for reference 
implementation."
+                )
+            self.queued_connection_tests[str(workload.connection_test_id)] = 
workload
         else:
             raise ValueError(
                 f"Un-handled workload type {type(workload).__name__!r} in 
{type(self).__name__}. "
-                f"Workload must be one of: ExecuteTask, ExecuteCallback."
+                f"Workload must be one of: ExecuteTask, ExecuteCallback, 
TestConnection."

Review Comment:
   Non-blocking future work unless you are already making more changes here.   
We'll eventually want to generate this list by unpacking ExecutorWorkload or 
something instead of manually adding new workload types I guess.



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -573,13 +592,24 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
     @property
     def slots_available(self):
-        """Number of new workloads (tasks and callbacks) this executor 
instance can accept."""
-        return self.parallelism - len(self.running) - len(self.queued_tasks) - 
len(self.queued_callbacks)
+        """Number of new workloads (tasks, callbacks, and connection tests) 
this executor instance can accept."""
+        return (
+            self.parallelism
+            - len(self.running)
+            - len(self.queued_tasks)
+            - len(self.queued_callbacks)
+            - len(self.queued_connection_tests)
+        )
 

Review Comment:
   Non-blocking future work idea:  I considered this when i was in here adding 
the callback workload but thought it wasn't worth the refactor at the time; 
maybe we're getting close to that time. I wonder if we should consider (again, 
not now) an alternative data object for the queues.   
   
   If we did something like `executor_queue: dict[workload_type, 
workload_queue_type]` and add the `workload_queue_type` field and type hint to 
each workload the same as the `type` field,  then we could do 
`executor_queue[workload.type].update(new_entry)` and these slots_available and 
slots_occupied get simplified down to iterating the executor_queue and summing 
the lengths.
   
   
   Anyway, like I said, absolutely not blocking but I wanted to get the idea 
out since it was starting to look relevant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to