This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 363358595438b4ecbd526ab0bd3a9e0568dabba8 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Mon Oct 5 14:16:08 2020 +0100 When sending tasks to celery from a sub-process, reset signal handlers (#11278) Since these processes are spawned from SchedulerJob after it has registered it's signals, if any of them got signaled they would have the behaviour of killing the ProcessorAgent process group! (MP has a default spawn of fork on Linux, so they inherit all previous state -- signals, and access to the `_process.pid` inside the ProcessorAgent instance) This behaviour is not what we want for these multiprocess.Pool processes. This _may_ be a source of the long-standing "scheduler is alive but not scheduling any jobs. Maybe. (cherry picked from commit baa980fbc83b25b5cf0700e567e69c2eb156412f) --- airflow/executors/celery_executor.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 128b25b..35b4e84 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -219,7 +219,14 @@ class CeleryExecutor(BaseExecutor): chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send)) num_processes = min(len(task_tuples_to_send), self._sync_parallelism) - send_pool = Pool(processes=num_processes) + def reset_signals(): + # Since we are run from inside the SchedulerJob, we don't to + # inherit the signal handlers that we registered there. + import signal + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + send_pool = Pool(processes=num_processes, initializer=reset_signals) key_and_async_results = send_pool.map( send_task_to_executor, task_tuples_to_send,
