ashb commented on code in PR #23944:
URL: https://github.com/apache/airflow/pull/23944#discussion_r1850542416


##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
         executor_config: Any | None = None,
     ) -> None:
         """Execute asynchronously."""
-        if TYPE_CHECKING:
-            assert self.impl
-
         self.validate_airflow_tasks_run_command(command)
+        self.activity_queue.put((key, command))
+        self._outstanding_messages += 1
+        self._check_workers(can_start=True)
+
+    def _check_workers(self, can_start: bool = True):
+        # Reap any dead workers
+        to_remove = set()
+        for pid, proc in self.workers.items():
+            if not proc.is_alive():
+                to_remove.add(pid)
+                proc.close()
+
+        if to_remove:
+            self.workers = {pid: proc for pid, proc in self.workers.items() if 
pid not in to_remove}
+
+        # If we're using spawn in multiprocessing (default on macos now) to 
start tasks, this can get called a
+        # via sync() a few times before the spawned process actually starts 
picking up messages. Try not to
+        # create too much
+
+        if self._outstanding_messages <= 0 or self.activity_queue.empty():
+            # Nothing to do, should we shut down idle workers?
+            return
 
-        self.impl.execute_async(key=key, command=command, queue=queue, 
executor_config=executor_config)
+        need_more_workers = len(self.workers) < self._outstanding_messages
+        if need_more_workers and (self.parallelism == 0 or len(self.workers) < 
self.parallelism):
+            self._spawn_worker()
+
+    def _spawn_worker(self):
+        p = multiprocessing.Process(
+            target=_run_worker,
+            kwargs={
+                "logger_name": self.log.name,
+                "input": self.activity_queue,
+                "output": self.result_queue,
+            },
+        )
+        p.start()
+        if TYPE_CHECKING:
+            assert p.pid  # Since we've called start
+        self.workers[p.pid] = p
 
     def sync(self) -> None:
         """Sync will get called periodically by the heartbeat method."""
-        if TYPE_CHECKING:
-            assert self.impl
+        self._read_results()
+        self._check_workers()
 
-        self.impl.sync()
+    def _read_results(self):
+        while not self.result_queue.empty():
+            key, state, exc = self.result_queue.get()
+            self._outstanding_messages = self._outstanding_messages - 1
+

Review Comment:
   I think this is fine actually -- the only way we can decrement is if the 
work finishes to completion. If the process exits abnormally it by definition 
couldn't have sent the completion message.
   
   So lets say that it picks up a message of the `activity_queue` and then dies.
   
   We'll never get a completion message on `result_queue`, so 
`_outstanding_messages` will be off by 1.
   
   Let me see if I can use a JoinableQueue (which has `task_done()` that we 
might be able to use instead. Maybe. It depends if we can get at the count 
without forcing a join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to