This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b46240520 Added small health check server and endpoint in scheduler 
and updated… (#23905)
6b46240520 is described below

commit 6b46240520199e850172b578bf1861c4a2f6e2ac
Author: Michael Petro <40223998+michaelmich...@users.noreply.github.com>
AuthorDate: Fri Jun 10 00:00:03 2022 -0400

    Added small health check server and endpoint in scheduler and updated… 
(#23905)
    
    Co-authored-by: Kamil Breguła <mik-...@users.noreply.github.com>
---
 airflow/cli/commands/scheduler_command.py          | 21 +++++-
 airflow/config_templates/config.yml                | 16 +++++
 airflow/config_templates/default_airflow.cfg       |  8 +++
 airflow/config_templates/default_test.cfg          |  1 +
 airflow/utils/scheduler_health.py                  | 58 +++++++++++++++
 .../logging-monitoring/check-health.rst            | 20 +++++-
 tests/cli/commands/test_scheduler_command.py       | 83 +++++++++++++++++++++-
 7 files changed, 198 insertions(+), 9 deletions(-)

diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index bc6e983ee5..e4b0309e1d 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -24,9 +24,11 @@ import daemon
 from daemon.pidfile import TimeoutPIDLockFile
 
 from airflow import settings
+from airflow.configuration import conf
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import process_subdir, setup_locations, setup_logging, 
sigint_handler, sigquit_handler
+from airflow.utils.scheduler_health import serve_health_check
 
 
 def _create_scheduler_job(args):
@@ -41,12 +43,16 @@ def _create_scheduler_job(args):
 def _run_scheduler_job(args):
     skip_serve_logs = args.skip_serve_logs
     job = _create_scheduler_job(args)
-    sub_proc = _serve_logs(skip_serve_logs)
+    logs_sub_proc = _serve_logs(skip_serve_logs)
+    enable_health_check = conf.getboolean('scheduler', 'ENABLE_HEALTH_CHECK')
+    health_sub_proc = _serve_health_check(enable_health_check)
     try:
         job.run()
     finally:
-        if sub_proc:
-            sub_proc.terminate()
+        if logs_sub_proc:
+            logs_sub_proc.terminate()
+        if health_sub_proc:
+            health_sub_proc.terminate()
 
 
 @cli_utils.action_cli
@@ -86,3 +92,12 @@ def _serve_logs(skip_serve_logs: bool = False) -> 
Optional[Process]:
             sub_proc.start()
             return sub_proc
     return None
+
+
+def _serve_health_check(enable_health_check: bool = False) -> 
Optional[Process]:
+    """Starts serve_health_check sub-process"""
+    if enable_health_check:
+        sub_proc = Process(target=serve_health_check)
+        sub_proc.start()
+        return sub_proc
+    return None
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index f884959a88..b7eb88d621 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1943,6 +1943,22 @@
       type: string
       example: ~
       default: "30"
+    - name: enable_health_check
+      description: |
+        When you start a scheduler, airflow starts a tiny web server
+        subprocess to serve a health check if this is set to True
+      version_added: 2.4.0
+      type: boolean
+      example: ~
+      default: "False"
+    - name: scheduler_health_check_server_port
+      description: |
+        When you start a scheduler, airflow starts a tiny web server
+        subprocess to serve a health check on this port
+      version_added: 2.4.0
+      type: string
+      example: ~
+      default: "8974"
     - name: orphaned_tasks_check_interval
       description: |
         How often (in seconds) should the scheduler check for orphaned tasks 
and SchedulerJobs
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index b5c1d4290b..f47d0a82a2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -980,6 +980,14 @@ pool_metrics_interval = 5.0
 # This is used by the health check in the "/health" endpoint
 scheduler_health_check_threshold = 30
 
+# When you start a scheduler, airflow starts a tiny web server
+# subprocess to serve a health check if this is set to True
+enable_health_check = False
+
+# When you start a scheduler, airflow starts a tiny web server
+# subprocess to serve a health check on this port
+scheduler_health_check_server_port = 8974
+
 # How often (in seconds) should the scheduler check for orphaned tasks and 
SchedulerJobs
 orphaned_tasks_check_interval = 300.0
 child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 83260d0d52..7a7e21bc10 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -110,6 +110,7 @@ job_heartbeat_sec = 1
 schedule_after_task_execution = False
 scheduler_heartbeat_sec = 5
 scheduler_health_check_threshold = 30
+scheduler_health_check_server_port = 8794
 parsing_processes = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
diff --git a/airflow/utils/scheduler_health.py 
b/airflow/utils/scheduler_health.py
new file mode 100644
index 0000000000..572f2b7e88
--- /dev/null
+++ b/airflow/utils/scheduler_health.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+from airflow.configuration import conf
+from airflow.jobs.scheduler_job import SchedulerJob
+from airflow.utils.net import get_hostname
+from airflow.utils.session import create_session
+
+
+class HealthServer(BaseHTTPRequestHandler):
+    """Small webserver to serve scheduler health check"""
+
+    def do_GET(self):
+        if self.path == '/health':
+            try:
+                with create_session() as session:
+                    scheduler_job = (
+                        session.query(SchedulerJob)
+                        .filter_by(hostname=get_hostname())
+                        .order_by(SchedulerJob.latest_heartbeat.desc())
+                        .limit(1)
+                        .first()
+                    )
+                if scheduler_job and scheduler_job.is_alive():
+                    self.send_response(200)
+                    self.end_headers()
+                else:
+                    self.send_error(503)
+            except Exception:
+                self.send_error(503)
+        else:
+            self.send_error(404)
+
+
+def serve_health_check():
+    health_check_port = conf.getint('scheduler', 
'SCHEDULER_HEALTH_CHECK_SERVER_PORT')
+    httpd = HTTPServer(("0.0.0.0", health_check_port), HealthServer)
+    httpd.serve_forever()
+
+
+if __name__ == "__main__":
+    serve_health_check()
diff --git a/docs/apache-airflow/logging-monitoring/check-health.rst 
b/docs/apache-airflow/logging-monitoring/check-health.rst
index f485483276..4dd724b734 100644
--- a/docs/apache-airflow/logging-monitoring/check-health.rst
+++ b/docs/apache-airflow/logging-monitoring/check-health.rst
@@ -32,8 +32,8 @@ For an example for a Docker Compose environment, see the 
``docker-compose.yaml``
 
 .. _check-health/http-endpoint:
 
-Health Check Endpoint
----------------------
+Webserver Health Check Endpoint
+-------------------------------
 
 To check the health status of your Airflow instance, you can simply access the 
endpoint
 ``/health``. It will return a JSON object in which a high-level glance is 
provided.
@@ -66,11 +66,25 @@ To check the health status of your Airflow instance, you 
can simply access the e
 Please keep in mind that the HTTP response code of ``/health`` endpoint 
**should not** be used to determine the health
 status of the application. The return code is only indicative of the state of 
the rest call (200 for success).
 
+Served by the web server, this health check endpoint is independent of the 
newer :ref:`Scheduler Health Check Server 
<check-health/scheduler-health-check-server>`, which optionally runs on each 
scheduler.
+
 .. note::
 
   For this check to work, at least one working web server is required. Suppose 
you use this check for scheduler
   monitoring, then in case of failure of the web server, you will lose the 
ability to monitor scheduler, which means
-  that it can be restarted even if it is in good condition. For greater 
confidence, consider using :ref:`CLI Check for Scheduler 
<check-health/cli-checks-for-scheduler>`.
+  that it can be restarted even if it is in good condition. For greater 
confidence, consider using :ref:`CLI Check for Scheduler 
<check-health/cli-checks-for-scheduler>` or  :ref:`Scheduler Health Check 
Server <check-health/scheduler-health-check-server>`.
+
+.. _check-health/scheduler-health-check-server:
+
+Scheduler Health Check Server
+-----------------------------
+
+In order to check scheduler health independent of the web server, Airflow 
optionally starts a small HTTP server
+in each scheduler to serve a scheduler ``\health`` endpoint. It returns status 
code ``200`` when the scheduler
+is healthy and status code ``503`` when the scheduler is unhealthy. To run 
this server in each scheduler, set
+``[scheduler]enable_health_check`` to ``True``. By default, it is ``False``. 
The server is running on the port
+specified by the ``[scheduler]scheduler_health_check_server_port`` option. By 
default, it is ``8974``. We are
+using `http.server.BaseHTTPRequestHandler 
<https://docs.python.org/3/library/http.server.html#http.server.BaseHTTPRequestHandler>`__
 as a small server.
 
 .. _check-health/cli-checks-for-scheduler:
 
diff --git a/tests/cli/commands/test_scheduler_command.py 
b/tests/cli/commands/test_scheduler_command.py
index 00b851f55a..3cce9c95d2 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -16,12 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 import unittest
+from http.server import BaseHTTPRequestHandler
 from unittest import mock
+from unittest.mock import MagicMock
 
 from parameterized import parameterized
 
 from airflow.cli import cli_parser
 from airflow.cli.commands import scheduler_command
+from airflow.utils.scheduler_health import HealthServer, serve_health_check
 from airflow.utils.serve_logs import serve_logs
 from tests.test_utils.config import conf_vars
 
@@ -53,9 +56,10 @@ class TestSchedulerCommand(unittest.TestCase):
         with conf_vars({("core", "executor"): executor}):
             scheduler_command.scheduler(args)
             if expect_serve_logs:
-                mock_process.assert_called_once_with(target=serve_logs)
+                mock_process.assert_has_calls([mock.call(target=serve_logs)])
             else:
-                mock_process.assert_not_called()
+                with self.assertRaises(AssertionError):
+                    
mock_process.assert_has_calls([mock.call(target=serve_logs)])
 
     @parameterized.expand(
         [
@@ -69,7 +73,8 @@ class TestSchedulerCommand(unittest.TestCase):
         args = self.parser.parse_args(['scheduler', '--skip-serve-logs'])
         with conf_vars({("core", "executor"): executor}):
             scheduler_command.scheduler(args)
-            mock_process.assert_not_called()
+            with self.assertRaises(AssertionError):
+                mock_process.assert_has_calls([mock.call(target=serve_logs)])
 
     @parameterized.expand(
         [
@@ -87,3 +92,75 @@ class TestSchedulerCommand(unittest.TestCase):
                 scheduler_command.scheduler(args)
             finally:
                 mock_process().terminate.assert_called()
+
+    @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+    @mock.patch("airflow.cli.commands.scheduler_command.Process")
+    def test_enable_scheduler_health(
+        self,
+        mock_process,
+        mock_scheduler_job,
+    ):
+        with conf_vars({("scheduler", "enable_health_check"): "True"}):
+            args = self.parser.parse_args(['scheduler'])
+            scheduler_command.scheduler(args)
+            
mock_process.assert_has_calls([mock.call(target=serve_health_check)])
+
+    @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+    @mock.patch("airflow.cli.commands.scheduler_command.Process")
+    def test_disable_scheduler_health(
+        self,
+        mock_process,
+        mock_scheduler_job,
+    ):
+        args = self.parser.parse_args(['scheduler'])
+        scheduler_command.scheduler(args)
+        with self.assertRaises(AssertionError):
+            
mock_process.assert_has_calls([mock.call(target=serve_health_check)])
+
+
+# Creating MockServer subclass of the HealthServer handler so that we can test 
the do_GET logic
+class MockServer(HealthServer):
+    def __init__(self):
+        # Overriding so we don't need to initialize with 
BaseHTTPRequestHandler.__init__ params
+        pass
+
+    def do_GET(self, path):
+        self.path = path
+        super().do_GET()
+
+
+class TestSchedulerHealthServer(unittest.TestCase):
+    def setUp(self) -> None:
+        self.mock_server = MockServer()
+
+    @mock.patch.object(BaseHTTPRequestHandler, "send_error")
+    def test_incorrect_endpoint(self, mock_send_error):
+        self.mock_server.do_GET("/incorrect")
+        mock_send_error.assert_called_with(404)
+
+    @mock.patch.object(BaseHTTPRequestHandler, "end_headers")
+    @mock.patch.object(BaseHTTPRequestHandler, "send_response")
+    @mock.patch('airflow.utils.scheduler_health.create_session')
+    def test_healthy_scheduler(self, mock_session, mock_send_response, 
mock_end_headers):
+        mock_scheduler_job = MagicMock()
+        mock_scheduler_job.is_alive.return_value = True
+        mock_session.return_value.__enter__.return_value.query.return_value = 
mock_scheduler_job
+        self.mock_server.do_GET("/health")
+        mock_send_response.assert_called_once_with(200)
+        mock_end_headers.assert_called_once()
+
+    @mock.patch.object(BaseHTTPRequestHandler, "send_error")
+    @mock.patch('airflow.utils.scheduler_health.create_session')
+    def test_unhealthy_scheduler(self, mock_session, mock_send_error):
+        mock_scheduler_job = MagicMock()
+        mock_scheduler_job.is_alive.return_value = False
+        mock_session.return_value.__enter__.return_value.query.return_value = 
mock_scheduler_job
+        self.mock_server.do_GET("/health")
+        mock_send_error.assert_called_with(503)
+
+    @mock.patch.object(BaseHTTPRequestHandler, "send_error")
+    @mock.patch('airflow.utils.scheduler_health.create_session')
+    def test_missing_scheduler(self, mock_session, mock_send_error):
+        mock_session.return_value.__enter__.return_value.query.return_value = 
None
+        self.mock_server.do_GET("/health")
+        mock_send_error.assert_called_with(503)

Reply via email to