ashb commented on issue #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. URL: https://github.com/apache/airflow/pull/6627#issuecomment-558237708 This is what the change for getting it working with multiprocessing looks like. (give or take, this diff is against a different version.) Do we think it's worth doing like this. I will test the timing, but this should be as fast as os.fork overall (give or take) ```diff diff --git airflow/task/task_runner/standard_task_runner.py airflow/task/task_runner/standard_task_runner.py index 87b046377..9b0ef162a 100644 --- airflow/task/task_runner/standard_task_runner.py +++ airflow/task/task_runner/standard_task_runner.py @@ -19,11 +19,13 @@ import os +import multiprocessing import psutil from setproctitle import setproctitle +from airflow.configuration import conf from airflow.task.task_runner.base_task_runner import BaseTaskRunner -from airflow.utils.helpers import reap_process_group +from airflow.utils.helpers import reap_process_group, DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM CAN_FORK = hasattr(os, 'fork') @@ -34,67 +36,106 @@ class StandardTaskRunner(BaseTaskRunner): """ def __init__(self, local_task_job): super().__init__(local_task_job) + self.mp_context = multiprocessing.get_context( + conf.get('core', 'exector_multiprocess_start_method', fallback=None) + ) self._rc = None + self._mp_process = None def start(self): - if CAN_FORK and not self.run_as_user: - self.process = self._start_by_fork() - else: + if self.run_as_user: self.process = self._start_by_exec() + else: + self.process = self._start_by_subprocess() def _start_by_exec(self): subprocess = self.run_command() return psutil.Process(subprocess.pid) - def _start_by_fork(self): - pid = os.fork() - if pid: - self.log.info("Started process %d to run task", pid) - return psutil.Process(pid) - else: - from airflow.bin.cli import get_parser - import signal - import airflow.settings as settings - - signal.signal(signal.SIGINT, signal.SIG_DFL) - signal.signal(signal.SIGTERM, signal.SIG_DFL) - # Start a new process group - os.setpgid(0, 0) - - # Force a new SQLAlchemy session. We can't share open DB handles - # between process. The cli code will re-create this as part of its - # normal startup - settings.engine.pool.dispose() - settings.engine.dispose() + def _start_by_subprocess(self): + self._mp_process = self.mp_context.Process( + target=type(self)._run_in_subprocess, + args=(self._command,), + daemon=False, + ) + self._mp_process.start() + + self.log.info("Started process %d to run task", self._mp_process.pid) + # Return a psutil.process so that we have the same type when doing run_as_user or this + return psutil.Process(self._mp_process.pid) + + @classmethod + def _run_in_subprocess(cls, command): + from airflow.bin.cli import get_parser + import signal + import airflow.settings as settings + + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + # Start a new process group + os.setpgid(0, 0) + + # Force a new SQLAlchemy session. We can't share open DB handles + # between process. The cli code will re-create this as part of its + # normal startup + settings.engine.pool.dispose() + settings.engine.dispose() + + parser = get_parser() + args = parser.parse_args(command[1:]) + + proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}" + if hasattr(args, "job_id"): + proc_title += " {0.job_id}" + setproctitle(proc_title.format(args)) - parser = get_parser() - args = parser.parse_args(self._command[1:]) - - proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}" - if hasattr(args, "job_id"): - proc_title += " {0.job_id}" - setproctitle(proc_title.format(args)) - - try: - args.func(args) - os._exit(0) - except Exception: - os._exit(1) + try: + args.func(args) + os._exit(0) + except Exception: + os._exit(1) def return_code(self, timeout=0): # We call this multiple times, but we can only wait on the process once - if self._rc is not None or not self.process: + if self._rc is not None: return self._rc - try: - self._rc = self.process.wait(timeout=timeout) - self.process = None - except psutil.TimeoutExpired: - pass + if self._mp_process: + self._mp_process.join(timeout) + self._rc = self._mp_process.exitcode + + if self._rc is not None: + self._mp_process.close() + self._mp_process = None + self._process = None + elif self.process: + try: + self._rc = self.process.wait(timeout=timeout) + self.process = None + except psutil.TimeoutExpired: + pass return self._rc def terminate(self): + if self._mp_process: + pgid = os.getpgid(self._mp_process.pid) + + self._mp_process.terminate() + self._rc = self._mp_process.exitcode + if self._rc is None: + self._mp_process.join(DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM) + self._mp_process.kill() + self._mp_process.join(0) + self._rc = self._mp_process.exitcode + + self._mp_process.close() + self.process = None + self._mp_process = None + + # Make sure we kill and reap any sub-processes that are left! + reap_process_group(pgid, self.log) + if self.process: if self.process.is_running(): rcs = reap_process_group(self.process.pid, self.log) ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
