This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 095c5fe313 Ensure __exit__ is called in decorator context managers
(#38383)
095c5fe313 is described below
commit 095c5fe3137e2cb6d45e8f3184bae149cb2850d1
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Mar 21 14:06:49 2024 -0700
Ensure __exit__ is called in decorator context managers (#38383)
In #36800 author fixed zombie scheduler issue arising from context manager
exit not being called, thus sub process not getting terminated. It was fixed
by explicitly calling the `close` function on an ExitStack-managed context
manager. Simpler / better / cleaner / more standard solution is to "fix" the
underlying context managers by wrapping the yield in a try / finally.
---
airflow/cli/commands/celery_command.py | 8 ++++---
airflow/cli/commands/scheduler_command.py | 32 +++++++++++---------------
airflow/providers/celery/cli/celery_command.py | 8 ++++---
tests/cli/commands/test_scheduler_command.py | 3 ---
4 files changed, 23 insertions(+), 28 deletions(-)
diff --git a/airflow/cli/commands/celery_command.py
b/airflow/cli/commands/celery_command.py
index ae8b9c1925..ed6c0dbbd8 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -91,9 +91,11 @@ def _serve_logs(skip_serve_logs: bool = False):
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
- yield
- if sub_proc:
- sub_proc.terminate()
+ try:
+ yield
+ finally:
+ if sub_proc:
+ sub_proc.terminate()
@after_setup_logger.connect()
diff --git a/airflow/cli/commands/scheduler_command.py
b/airflow/cli/commands/scheduler_command.py
index 0b5cac8857..2a55ca2373 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import logging
from argparse import Namespace
-from contextlib import ExitStack, contextmanager
+from contextlib import contextmanager
from multiprocessing import Process
from airflow import settings
@@ -45,18 +45,8 @@ def _run_scheduler_job(args) -> None:
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
- with ExitStack() as stack:
- stack.enter_context(_serve_logs(args.skip_serve_logs))
- stack.enter_context(_serve_health_check(enable_health_check))
-
- try:
- run_job(job=job_runner.job, execute_callable=job_runner._execute)
- except Exception:
- log.exception("Exception when running scheduler job")
- raise
- finally:
- # Ensure that the contexts are closed
- stack.close()
+ with _serve_logs(args.skip_serve_logs),
_serve_health_check(enable_health_check):
+ run_job(job=job_runner.job, execute_callable=job_runner._execute)
@cli_utils.action_cli
@@ -84,9 +74,11 @@ def _serve_logs(skip_serve_logs: bool = False):
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
- yield
- if sub_proc:
- sub_proc.terminate()
+ try:
+ yield
+ finally:
+ if sub_proc:
+ sub_proc.terminate()
@contextmanager
@@ -96,6 +88,8 @@ def _serve_health_check(enable_health_check: bool = False):
if enable_health_check:
sub_proc = Process(target=serve_health_check)
sub_proc.start()
- yield
- if sub_proc:
- sub_proc.terminate()
+ try:
+ yield
+ finally:
+ if sub_proc:
+ sub_proc.terminate()
diff --git a/airflow/providers/celery/cli/celery_command.py
b/airflow/providers/celery/cli/celery_command.py
index fff46090aa..f7682b9abf 100644
--- a/airflow/providers/celery/cli/celery_command.py
+++ b/airflow/providers/celery/cli/celery_command.py
@@ -107,9 +107,11 @@ def _serve_logs(skip_serve_logs: bool = False):
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
- yield
- if sub_proc:
- sub_proc.terminate()
+ try:
+ yield
+ finally:
+ if sub_proc:
+ sub_proc.terminate()
@after_setup_logger.connect()
diff --git a/tests/cli/commands/test_scheduler_command.py
b/tests/cli/commands/test_scheduler_command.py
index 2853763563..b6d6a9d921 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -165,10 +165,8 @@ class TestSchedulerCommand:
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@mock.patch("airflow.cli.commands.scheduler_command.run_job",
side_effect=Exception("run_job failed"))
- @mock.patch("airflow.cli.commands.scheduler_command.log")
def test_run_job_exception_handling(
self,
- mock_log,
mock_run_job,
mock_process,
mock_scheduler_job,
@@ -183,7 +181,6 @@ class TestSchedulerCommand:
job=mock_scheduler_job().job,
execute_callable=mock_scheduler_job()._execute,
)
- mock_log.exception.assert_called_once_with("Exception when running
scheduler job")
mock_process.assert_called_once_with(target=serve_logs)
mock_process().terminate.assert_called_once_with()