This is an automated email from the ASF dual-hosted git repository. kamilbregula pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 6b46240520 Added small health check server and endpoint in scheduler and updated… (#23905) 6b46240520 is described below commit 6b46240520199e850172b578bf1861c4a2f6e2ac Author: Michael Petro <40223998+michaelmich...@users.noreply.github.com> AuthorDate: Fri Jun 10 00:00:03 2022 -0400 Added small health check server and endpoint in scheduler and updated… (#23905) Co-authored-by: Kamil Breguła <mik-...@users.noreply.github.com> --- airflow/cli/commands/scheduler_command.py | 21 +++++- airflow/config_templates/config.yml | 16 +++++ airflow/config_templates/default_airflow.cfg | 8 +++ airflow/config_templates/default_test.cfg | 1 + airflow/utils/scheduler_health.py | 58 +++++++++++++++ .../logging-monitoring/check-health.rst | 20 +++++- tests/cli/commands/test_scheduler_command.py | 83 +++++++++++++++++++++- 7 files changed, 198 insertions(+), 9 deletions(-) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index bc6e983ee5..e4b0309e1d 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -24,9 +24,11 @@ import daemon from daemon.pidfile import TimeoutPIDLockFile from airflow import settings +from airflow.configuration import conf from airflow.jobs.scheduler_job import SchedulerJob from airflow.utils import cli as cli_utils from airflow.utils.cli import process_subdir, setup_locations, setup_logging, sigint_handler, sigquit_handler +from airflow.utils.scheduler_health import serve_health_check def _create_scheduler_job(args): @@ -41,12 +43,16 @@ def _create_scheduler_job(args): def _run_scheduler_job(args): skip_serve_logs = args.skip_serve_logs job = _create_scheduler_job(args) - sub_proc = _serve_logs(skip_serve_logs) + logs_sub_proc = _serve_logs(skip_serve_logs) + enable_health_check = conf.getboolean('scheduler', 'ENABLE_HEALTH_CHECK') + health_sub_proc = _serve_health_check(enable_health_check) try: job.run() finally: - if sub_proc: - sub_proc.terminate() + if logs_sub_proc: + logs_sub_proc.terminate() + if health_sub_proc: + health_sub_proc.terminate() @cli_utils.action_cli @@ -86,3 +92,12 @@ def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]: sub_proc.start() return sub_proc return None + + +def _serve_health_check(enable_health_check: bool = False) -> Optional[Process]: + """Starts serve_health_check sub-process""" + if enable_health_check: + sub_proc = Process(target=serve_health_check) + sub_proc.start() + return sub_proc + return None diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f884959a88..b7eb88d621 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1943,6 +1943,22 @@ type: string example: ~ default: "30" + - name: enable_health_check + description: | + When you start a scheduler, airflow starts a tiny web server + subprocess to serve a health check if this is set to True + version_added: 2.4.0 + type: boolean + example: ~ + default: "False" + - name: scheduler_health_check_server_port + description: | + When you start a scheduler, airflow starts a tiny web server + subprocess to serve a health check on this port + version_added: 2.4.0 + type: string + example: ~ + default: "8974" - name: orphaned_tasks_check_interval description: | How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index b5c1d4290b..f47d0a82a2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -980,6 +980,14 @@ pool_metrics_interval = 5.0 # This is used by the health check in the "/health" endpoint scheduler_health_check_threshold = 30 +# When you start a scheduler, airflow starts a tiny web server +# subprocess to serve a health check if this is set to True +enable_health_check = False + +# When you start a scheduler, airflow starts a tiny web server +# subprocess to serve a health check on this port +scheduler_health_check_server_port = 8974 + # How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs orphaned_tasks_check_interval = 300.0 child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 83260d0d52..7a7e21bc10 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -110,6 +110,7 @@ job_heartbeat_sec = 1 schedule_after_task_execution = False scheduler_heartbeat_sec = 5 scheduler_health_check_threshold = 30 +scheduler_health_check_server_port = 8794 parsing_processes = 2 catchup_by_default = True scheduler_zombie_task_threshold = 300 diff --git a/airflow/utils/scheduler_health.py b/airflow/utils/scheduler_health.py new file mode 100644 index 0000000000..572f2b7e88 --- /dev/null +++ b/airflow/utils/scheduler_health.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from http.server import BaseHTTPRequestHandler, HTTPServer + +from airflow.configuration import conf +from airflow.jobs.scheduler_job import SchedulerJob +from airflow.utils.net import get_hostname +from airflow.utils.session import create_session + + +class HealthServer(BaseHTTPRequestHandler): + """Small webserver to serve scheduler health check""" + + def do_GET(self): + if self.path == '/health': + try: + with create_session() as session: + scheduler_job = ( + session.query(SchedulerJob) + .filter_by(hostname=get_hostname()) + .order_by(SchedulerJob.latest_heartbeat.desc()) + .limit(1) + .first() + ) + if scheduler_job and scheduler_job.is_alive(): + self.send_response(200) + self.end_headers() + else: + self.send_error(503) + except Exception: + self.send_error(503) + else: + self.send_error(404) + + +def serve_health_check(): + health_check_port = conf.getint('scheduler', 'SCHEDULER_HEALTH_CHECK_SERVER_PORT') + httpd = HTTPServer(("0.0.0.0", health_check_port), HealthServer) + httpd.serve_forever() + + +if __name__ == "__main__": + serve_health_check() diff --git a/docs/apache-airflow/logging-monitoring/check-health.rst b/docs/apache-airflow/logging-monitoring/check-health.rst index f485483276..4dd724b734 100644 --- a/docs/apache-airflow/logging-monitoring/check-health.rst +++ b/docs/apache-airflow/logging-monitoring/check-health.rst @@ -32,8 +32,8 @@ For an example for a Docker Compose environment, see the ``docker-compose.yaml`` .. _check-health/http-endpoint: -Health Check Endpoint ---------------------- +Webserver Health Check Endpoint +------------------------------- To check the health status of your Airflow instance, you can simply access the endpoint ``/health``. It will return a JSON object in which a high-level glance is provided. @@ -66,11 +66,25 @@ To check the health status of your Airflow instance, you can simply access the e Please keep in mind that the HTTP response code of ``/health`` endpoint **should not** be used to determine the health status of the application. The return code is only indicative of the state of the rest call (200 for success). +Served by the web server, this health check endpoint is independent of the newer :ref:`Scheduler Health Check Server <check-health/scheduler-health-check-server>`, which optionally runs on each scheduler. + .. note:: For this check to work, at least one working web server is required. Suppose you use this check for scheduler monitoring, then in case of failure of the web server, you will lose the ability to monitor scheduler, which means - that it can be restarted even if it is in good condition. For greater confidence, consider using :ref:`CLI Check for Scheduler <check-health/cli-checks-for-scheduler>`. + that it can be restarted even if it is in good condition. For greater confidence, consider using :ref:`CLI Check for Scheduler <check-health/cli-checks-for-scheduler>` or :ref:`Scheduler Health Check Server <check-health/scheduler-health-check-server>`. + +.. _check-health/scheduler-health-check-server: + +Scheduler Health Check Server +----------------------------- + +In order to check scheduler health independent of the web server, Airflow optionally starts a small HTTP server +in each scheduler to serve a scheduler ``\health`` endpoint. It returns status code ``200`` when the scheduler +is healthy and status code ``503`` when the scheduler is unhealthy. To run this server in each scheduler, set +``[scheduler]enable_health_check`` to ``True``. By default, it is ``False``. The server is running on the port +specified by the ``[scheduler]scheduler_health_check_server_port`` option. By default, it is ``8974``. We are +using `http.server.BaseHTTPRequestHandler <https://docs.python.org/3/library/http.server.html#http.server.BaseHTTPRequestHandler>`__ as a small server. .. _check-health/cli-checks-for-scheduler: diff --git a/tests/cli/commands/test_scheduler_command.py b/tests/cli/commands/test_scheduler_command.py index 00b851f55a..3cce9c95d2 100644 --- a/tests/cli/commands/test_scheduler_command.py +++ b/tests/cli/commands/test_scheduler_command.py @@ -16,12 +16,15 @@ # specific language governing permissions and limitations # under the License. import unittest +from http.server import BaseHTTPRequestHandler from unittest import mock +from unittest.mock import MagicMock from parameterized import parameterized from airflow.cli import cli_parser from airflow.cli.commands import scheduler_command +from airflow.utils.scheduler_health import HealthServer, serve_health_check from airflow.utils.serve_logs import serve_logs from tests.test_utils.config import conf_vars @@ -53,9 +56,10 @@ class TestSchedulerCommand(unittest.TestCase): with conf_vars({("core", "executor"): executor}): scheduler_command.scheduler(args) if expect_serve_logs: - mock_process.assert_called_once_with(target=serve_logs) + mock_process.assert_has_calls([mock.call(target=serve_logs)]) else: - mock_process.assert_not_called() + with self.assertRaises(AssertionError): + mock_process.assert_has_calls([mock.call(target=serve_logs)]) @parameterized.expand( [ @@ -69,7 +73,8 @@ class TestSchedulerCommand(unittest.TestCase): args = self.parser.parse_args(['scheduler', '--skip-serve-logs']) with conf_vars({("core", "executor"): executor}): scheduler_command.scheduler(args) - mock_process.assert_not_called() + with self.assertRaises(AssertionError): + mock_process.assert_has_calls([mock.call(target=serve_logs)]) @parameterized.expand( [ @@ -87,3 +92,75 @@ class TestSchedulerCommand(unittest.TestCase): scheduler_command.scheduler(args) finally: mock_process().terminate.assert_called() + + @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob") + @mock.patch("airflow.cli.commands.scheduler_command.Process") + def test_enable_scheduler_health( + self, + mock_process, + mock_scheduler_job, + ): + with conf_vars({("scheduler", "enable_health_check"): "True"}): + args = self.parser.parse_args(['scheduler']) + scheduler_command.scheduler(args) + mock_process.assert_has_calls([mock.call(target=serve_health_check)]) + + @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob") + @mock.patch("airflow.cli.commands.scheduler_command.Process") + def test_disable_scheduler_health( + self, + mock_process, + mock_scheduler_job, + ): + args = self.parser.parse_args(['scheduler']) + scheduler_command.scheduler(args) + with self.assertRaises(AssertionError): + mock_process.assert_has_calls([mock.call(target=serve_health_check)]) + + +# Creating MockServer subclass of the HealthServer handler so that we can test the do_GET logic +class MockServer(HealthServer): + def __init__(self): + # Overriding so we don't need to initialize with BaseHTTPRequestHandler.__init__ params + pass + + def do_GET(self, path): + self.path = path + super().do_GET() + + +class TestSchedulerHealthServer(unittest.TestCase): + def setUp(self) -> None: + self.mock_server = MockServer() + + @mock.patch.object(BaseHTTPRequestHandler, "send_error") + def test_incorrect_endpoint(self, mock_send_error): + self.mock_server.do_GET("/incorrect") + mock_send_error.assert_called_with(404) + + @mock.patch.object(BaseHTTPRequestHandler, "end_headers") + @mock.patch.object(BaseHTTPRequestHandler, "send_response") + @mock.patch('airflow.utils.scheduler_health.create_session') + def test_healthy_scheduler(self, mock_session, mock_send_response, mock_end_headers): + mock_scheduler_job = MagicMock() + mock_scheduler_job.is_alive.return_value = True + mock_session.return_value.__enter__.return_value.query.return_value = mock_scheduler_job + self.mock_server.do_GET("/health") + mock_send_response.assert_called_once_with(200) + mock_end_headers.assert_called_once() + + @mock.patch.object(BaseHTTPRequestHandler, "send_error") + @mock.patch('airflow.utils.scheduler_health.create_session') + def test_unhealthy_scheduler(self, mock_session, mock_send_error): + mock_scheduler_job = MagicMock() + mock_scheduler_job.is_alive.return_value = False + mock_session.return_value.__enter__.return_value.query.return_value = mock_scheduler_job + self.mock_server.do_GET("/health") + mock_send_error.assert_called_with(503) + + @mock.patch.object(BaseHTTPRequestHandler, "send_error") + @mock.patch('airflow.utils.scheduler_health.create_session') + def test_missing_scheduler(self, mock_session, mock_send_error): + mock_session.return_value.__enter__.return_value.query.return_value = None + self.mock_server.do_GET("/health") + mock_send_error.assert_called_with(503)