This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f2108892e8 Catch arbitrary exception from run_job to prevent zombie 
scheduler (#32707)
f2108892e8 is described below

commit f2108892e89085f695f8a3f52e076b39288497c6
Author: Bruce <[email protected]>
AuthorDate: Tue Jul 25 15:02:01 2023 -0700

    Catch arbitrary exception from run_job to prevent zombie scheduler (#32707)
---
 airflow/cli/commands/scheduler_command.py    |  8 +++++++-
 tests/cli/commands/test_scheduler_command.py | 24 ++++++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index 808645282c..ad0063ffa3 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -17,6 +17,7 @@
 """Scheduler command."""
 from __future__ import annotations
 
+import logging
 import signal
 from contextlib import contextmanager
 from multiprocessing import Process
@@ -35,12 +36,17 @@ from airflow.utils.cli import process_subdir, 
setup_locations, setup_logging, si
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.scheduler_health import serve_health_check
 
+log = logging.getLogger(__name__)
+
 
 def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs: 
bool) -> None:
     InternalApiConfig.force_database_direct_access()
     enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
     with _serve_logs(skip_serve_logs), 
_serve_health_check(enable_health_check):
-        run_job(job=job_runner.job, execute_callable=job_runner._execute)
+        try:
+            run_job(job=job_runner.job, execute_callable=job_runner._execute)
+        except Exception:
+            log.exception("Exception when running scheduler job")
 
 
 @cli_utils.action_cli
diff --git a/tests/cli/commands/test_scheduler_command.py 
b/tests/cli/commands/test_scheduler_command.py
index 2404b3b27c..1b66cc7b85 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -139,6 +139,30 @@ class TestSchedulerCommand:
         with pytest.raises(AssertionError):
             
mock_process.assert_has_calls([mock.call(target=serve_health_check)])
 
+    @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
+    @mock.patch("airflow.cli.commands.scheduler_command.Process")
+    @mock.patch("airflow.cli.commands.scheduler_command.run_job", 
side_effect=Exception("run_job failed"))
+    @mock.patch("airflow.cli.commands.scheduler_command.log")
+    def test_run_job_exception_handling(
+        self,
+        mock_log,
+        mock_run_job,
+        mock_process,
+        mock_scheduler_job,
+    ):
+        args = self.parser.parse_args(["scheduler"])
+        scheduler_command.scheduler(args)
+
+        # Make sure that run_job is called, that the exception has been 
logged, and that the serve_logs
+        # sub-process has been terminated
+        mock_run_job.assert_called_once_with(
+            job=mock_scheduler_job().job,
+            execute_callable=mock_scheduler_job()._execute,
+        )
+        mock_log.exception.assert_called_once_with("Exception when running 
scheduler job")
+        mock_process.assert_called_once_with(target=serve_logs)
+        mock_process().terminate.assert_called_once_with()
+
 
 # Creating MockServer subclass of the HealthServer handler so that we can test 
the do_GET logic
 class MockServer(HealthServer):

Reply via email to