This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit fcb784f798d76f4a1500377a6ea0f30187705131 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Mon Oct 5 14:16:08 2020 +0100 When sending tasks to celery from a sub-process, reset signal handlers (#11278) Since these processes are spawned from SchedulerJob after it has registered it's signals, if any of them got signaled they would have the behaviour of killing the ProcessorAgent process group! (MP has a default spawn of fork on Linux, so they inherit all previous state -- signals, and access to the `_process.pid` inside the ProcessorAgent instance) This behaviour is not what we want for these multiprocess.Pool processes. This _may_ be a source of the long-standing "scheduler is alive but not scheduling any jobs. Maybe. (cherry picked from commit baa980fbc83b25b5cf0700e567e69c2eb156412f) --- airflow/executors/celery_executor.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 128b25b..35b4e84 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -219,7 +219,14 @@ class CeleryExecutor(BaseExecutor): chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send)) num_processes = min(len(task_tuples_to_send), self._sync_parallelism) - send_pool = Pool(processes=num_processes) + def reset_signals(): + # Since we are run from inside the SchedulerJob, we don't to + # inherit the signal handlers that we registered there. + import signal + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + send_pool = Pool(processes=num_processes, initializer=reset_signals) key_and_async_results = send_pool.map( send_task_to_executor, task_tuples_to_send,
