ashb commented on code in PR #23944:
URL: https://github.com/apache/airflow/pull/23944#discussion_r1850603063
##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
- if TYPE_CHECKING:
- assert self.impl
-
self.validate_airflow_tasks_run_command(command)
+ self.activity_queue.put((key, command))
+ self._outstanding_messages += 1
+ self._check_workers(can_start=True)
+
+ def _check_workers(self, can_start: bool = True):
+ # Reap any dead workers
+ to_remove = set()
+ for pid, proc in self.workers.items():
+ if not proc.is_alive():
+ to_remove.add(pid)
+ proc.close()
+
+ if to_remove:
+ self.workers = {pid: proc for pid, proc in self.workers.items() if
pid not in to_remove}
+
+ # If we're using spawn in multiprocessing (default on macos now) to
start tasks, this can get called a
+ # via sync() a few times before the spawned process actually starts
picking up messages. Try not to
+ # create too much
+
+ if self._outstanding_messages <= 0 or self.activity_queue.empty():
+ # Nothing to do, should we shut down idle workers?
Review Comment:
Updated the comment. I don't think this is a big enough thing to create a GH
issue for tbh
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]