This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ca679720653242d3c656d41fb93d61a5c7c79bb8 Author: Jarek Potiuk <[email protected]> AuthorDate: Sun Aug 1 20:45:01 2021 +0200 Moves SchedulerJob initialization to within daemon context (#17157) In Scheduler, the SchedulerJob was instantiated before demon context was activated. SchedulerJob is a database ORM object from SQL Alchemy and it opens the connection to Postgres: When you activate daemon context, what happens under the hood is forking the process, and while some of the opened sockets were passed to the forks (stdin and stderr but also the opened log file handle), the established socket for DB connection was not passed. As the result, when scheduler was started with --daemonize flag the error `SSL SYSCALL error: Socket operation on non-socket` was raised. The PR moves SchedulerJob initialization to within the context which makes the connection to Postgres initialized after the process has been forked and daemonized. Fixes: #17120 (cherry picked from commit e8fc3acfd9884312669c1d85b71f42a9aab29cf8) --- airflow/cli/commands/scheduler_command.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index 368db6f..44674f0 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -29,17 +29,21 @@ from airflow.utils import cli as cli_utils from airflow.utils.cli import process_subdir, setup_locations, setup_logging, sigint_handler, sigquit_handler +def _create_scheduler_job(args): + job = SchedulerJob( + subdir=process_subdir(args.subdir), + num_runs=args.num_runs, + do_pickle=args.do_pickle, + ) + return job + + @cli_utils.action_logging def scheduler(args): """Starts Airflow Scheduler""" skip_serve_logs = args.skip_serve_logs print(settings.HEADER) - job = SchedulerJob( - subdir=process_subdir(args.subdir), - num_runs=args.num_runs, - do_pickle=args.do_pickle, - ) if args.daemon: pid, stdout, stderr, log_file = setup_locations( @@ -54,9 +58,11 @@ def scheduler(args): stderr=stderr_handle, ) with ctx: + job = _create_scheduler_job(args) sub_proc = _serve_logs(skip_serve_logs) job.run() else: + job = _create_scheduler_job(args) signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) signal.signal(signal.SIGQUIT, sigquit_handler)
