ashb commented on code in PR #23944:
URL: https://github.com/apache/airflow/pull/23944#discussion_r1850602344
##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
- if TYPE_CHECKING:
- assert self.impl
-
self.validate_airflow_tasks_run_command(command)
+ self.activity_queue.put((key, command))
+ self._outstanding_messages += 1
+ self._check_workers(can_start=True)
+
+ def _check_workers(self, can_start: bool = True):
+ # Reap any dead workers
+ to_remove = set()
+ for pid, proc in self.workers.items():
+ if not proc.is_alive():
+ to_remove.add(pid)
+ proc.close()
+
+ if to_remove:
+ self.workers = {pid: proc for pid, proc in self.workers.items() if
pid not in to_remove}
+
+ # If we're using spawn in multiprocessing (default on macos now) to
start tasks, this can get called a
+ # via sync() a few times before the spawned process actually starts
picking up messages. Try not to
+ # create too much
+
+ if self._outstanding_messages <= 0 or self.activity_queue.empty():
+ # Nothing to do, should we shut down idle workers?
+ return
- self.impl.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
+ need_more_workers = len(self.workers) < self._outstanding_messages
+ if need_more_workers and (self.parallelism == 0 or len(self.workers) <
self.parallelism):
+ self._spawn_worker()
+
+ def _spawn_worker(self):
+ p = multiprocessing.Process(
+ target=_run_worker,
+ kwargs={
+ "logger_name": self.log.name,
+ "input": self.activity_queue,
+ "output": self.result_queue,
+ },
+ )
+ p.start()
+ if TYPE_CHECKING:
+ assert p.pid # Since we've called start
+ self.workers[p.pid] = p
def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
- if TYPE_CHECKING:
- assert self.impl
+ self._read_results()
+ self._check_workers()
- self.impl.sync()
+ def _read_results(self):
+ while not self.result_queue.empty():
+ key, state, exc = self.result_queue.get()
+ self._outstanding_messages = self._outstanding_messages - 1
+
Review Comment:
In 2f7d6c53d2 I've switched it to use a synchronized integer via
`multiprocessing.Value` that we can pass to the worker process, so they can
decrement the count as soon as it reads the message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]