This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f2108892e8 Catch arbitrary exception from run_job to prevent zombie
scheduler (#32707)
f2108892e8 is described below
commit f2108892e89085f695f8a3f52e076b39288497c6
Author: Bruce <[email protected]>
AuthorDate: Tue Jul 25 15:02:01 2023 -0700
Catch arbitrary exception from run_job to prevent zombie scheduler (#32707)
---
airflow/cli/commands/scheduler_command.py | 8 +++++++-
tests/cli/commands/test_scheduler_command.py | 24 ++++++++++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/airflow/cli/commands/scheduler_command.py
b/airflow/cli/commands/scheduler_command.py
index 808645282c..ad0063ffa3 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -17,6 +17,7 @@
"""Scheduler command."""
from __future__ import annotations
+import logging
import signal
from contextlib import contextmanager
from multiprocessing import Process
@@ -35,12 +36,17 @@ from airflow.utils.cli import process_subdir,
setup_locations, setup_logging, si
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.scheduler_health import serve_health_check
+log = logging.getLogger(__name__)
+
def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs:
bool) -> None:
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(skip_serve_logs),
_serve_health_check(enable_health_check):
- run_job(job=job_runner.job, execute_callable=job_runner._execute)
+ try:
+ run_job(job=job_runner.job, execute_callable=job_runner._execute)
+ except Exception:
+ log.exception("Exception when running scheduler job")
@cli_utils.action_cli
diff --git a/tests/cli/commands/test_scheduler_command.py
b/tests/cli/commands/test_scheduler_command.py
index 2404b3b27c..1b66cc7b85 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -139,6 +139,30 @@ class TestSchedulerCommand:
with pytest.raises(AssertionError):
mock_process.assert_has_calls([mock.call(target=serve_health_check)])
+ @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
+ @mock.patch("airflow.cli.commands.scheduler_command.Process")
+ @mock.patch("airflow.cli.commands.scheduler_command.run_job",
side_effect=Exception("run_job failed"))
+ @mock.patch("airflow.cli.commands.scheduler_command.log")
+ def test_run_job_exception_handling(
+ self,
+ mock_log,
+ mock_run_job,
+ mock_process,
+ mock_scheduler_job,
+ ):
+ args = self.parser.parse_args(["scheduler"])
+ scheduler_command.scheduler(args)
+
+ # Make sure that run_job is called, that the exception has been
logged, and that the serve_logs
+ # sub-process has been terminated
+ mock_run_job.assert_called_once_with(
+ job=mock_scheduler_job().job,
+ execute_callable=mock_scheduler_job()._execute,
+ )
+ mock_log.exception.assert_called_once_with("Exception when running
scheduler job")
+ mock_process.assert_called_once_with(target=serve_logs)
+ mock_process().terminate.assert_called_once_with()
+
# Creating MockServer subclass of the HealthServer handler so that we can test
the do_GET logic
class MockServer(HealthServer):