This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c4d44e716ec AIP84: Check standalone_dag_processor config in
get_airflow_health() and update health endpoint (#44383)
c4d44e716ec is described below
commit c4d44e716ec2c92d3d032c5c97448559c23d0b0b
Author: vatsrahul1001 <[email protected]>
AuthorDate: Fri Nov 29 20:50:08 2024 +0530
AIP84: Check standalone_dag_processor config in get_airflow_health() and
update health endpoint (#44383)
* AIP84 health endpoint skip dag processor
* AIP84 health endpoint skip dag processor
* fixing test_airflow_health tests and adding new tests
* AIP84 health endpoint skip dag processor
* AIP84 health endpoint skip dag processor
* fixing test_airflow_health tests and adding new tests
* fix static tests
* Using only one model class HealthInfoSchema
* fixing static checks
* implement review comments
* fix static tests
---
airflow/api/common/airflow_health.py | 40 ++++---
airflow/api_fastapi/core_api/datamodels/monitor.py | 2 +-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 5 +-
.../api_fastapi/core_api/routes/public/monitor.py | 4 +-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 +-
airflow/ui/openapi-gen/requests/types.gen.ts | 2 +-
airflow/ui/src/pages/Dashboard/Health/Health.tsx | 5 +-
tests/api/common/test_airflow_health.py | 116 ++++++++++++++++++++-
.../core_api/routes/public/test_monitor.py | 52 +++++++++
9 files changed, 205 insertions(+), 32 deletions(-)
diff --git a/airflow/api/common/airflow_health.py
b/airflow/api/common/airflow_health.py
index 5d37de540a4..043557fa0db 100644
--- a/airflow/api/common/airflow_health.py
+++ b/airflow/api/common/airflow_health.py
@@ -18,6 +18,7 @@ from __future__ import annotations
from typing import Any
+from airflow.configuration import conf
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
@@ -28,13 +29,13 @@ UNHEALTHY = "unhealthy"
def get_airflow_health() -> dict[str, Any]:
"""Get the health for Airflow metadatabase, scheduler and triggerer."""
+ dag_processor_enabled = conf.getboolean("scheduler",
"standalone_dag_processor")
metadatabase_status = HEALTHY
latest_scheduler_heartbeat = None
latest_triggerer_heartbeat = None
- latest_dag_processor_heartbeat = None
+
scheduler_status = UNHEALTHY
triggerer_status: str | None = UNHEALTHY
- dag_processor_status: str | None = UNHEALTHY
try:
latest_scheduler_job = SchedulerJobRunner.most_recent_job()
@@ -58,18 +59,6 @@ def get_airflow_health() -> dict[str, Any]:
except Exception:
metadatabase_status = UNHEALTHY
- try:
- latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()
-
- if latest_dag_processor_job:
- latest_dag_processor_heartbeat =
latest_dag_processor_job.latest_heartbeat.isoformat()
- if latest_dag_processor_job.is_alive():
- dag_processor_status = HEALTHY
- else:
- dag_processor_status = None
- except Exception:
- metadatabase_status = UNHEALTHY
-
airflow_health_status = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {
@@ -80,10 +69,27 @@ def get_airflow_health() -> dict[str, Any]:
"status": triggerer_status,
"latest_triggerer_heartbeat": latest_triggerer_heartbeat,
},
- "dag_processor": {
+ }
+
+ if dag_processor_enabled:
+ latest_dag_processor_heartbeat = None
+ dag_processor_status: str | None = UNHEALTHY
+
+ try:
+ latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()
+
+ if latest_dag_processor_job:
+ latest_dag_processor_heartbeat =
latest_dag_processor_job.latest_heartbeat.isoformat()
+ if latest_dag_processor_job.is_alive():
+ dag_processor_status = HEALTHY
+ else:
+ dag_processor_status = None
+ except Exception:
+ metadatabase_status = UNHEALTHY
+
+ airflow_health_status["dag_processor"] = {
"status": dag_processor_status,
"latest_dag_processor_heartbeat": latest_dag_processor_heartbeat,
- },
- }
+ }
return airflow_health_status
diff --git a/airflow/api_fastapi/core_api/datamodels/monitor.py
b/airflow/api_fastapi/core_api/datamodels/monitor.py
index fbaf40b4e84..f4434034424 100644
--- a/airflow/api_fastapi/core_api/datamodels/monitor.py
+++ b/airflow/api_fastapi/core_api/datamodels/monitor.py
@@ -49,4 +49,4 @@ class HealthInfoSchema(BaseModel):
metadatabase: BaseInfoSchema
scheduler: SchedulerInfoSchema
triggerer: TriggererInfoSchema
- dag_processor: DagProcessorInfoSchema
+ dag_processor: DagProcessorInfoSchema | None = None
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index a38fa874c08..9832c3668fe 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -7586,13 +7586,14 @@ components:
triggerer:
$ref: '#/components/schemas/TriggererInfoSchema'
dag_processor:
- $ref: '#/components/schemas/DagProcessorInfoSchema'
+ anyOf:
+ - $ref: '#/components/schemas/DagProcessorInfoSchema'
+ - type: 'null'
type: object
required:
- metadatabase
- scheduler
- triggerer
- - dag_processor
title: HealthInfoSchema
description: Schema for the Health endpoint.
HistoricalMetricDataResponse:
diff --git a/airflow/api_fastapi/core_api/routes/public/monitor.py
b/airflow/api_fastapi/core_api/routes/public/monitor.py
index 38953a18b99..538bdcb3521 100644
--- a/airflow/api_fastapi/core_api/routes/public/monitor.py
+++ b/airflow/api_fastapi/core_api/routes/public/monitor.py
@@ -24,7 +24,7 @@ from airflow.api_fastapi.core_api.datamodels.monitor import
HealthInfoSchema
monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor")
-@monitor_router.get("/health")
-def get_health() -> HealthInfoSchema:
+@monitor_router.get("/health", response_model=HealthInfoSchema,
response_model_exclude_unset=True)
+def get_health():
airflow_health_status = get_airflow_health()
return HealthInfoSchema.model_validate(airflow_health_status)
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index b9289674a18..13e657d0083 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2886,11 +2886,18 @@ export const $HealthInfoSchema = {
$ref: "#/components/schemas/TriggererInfoSchema",
},
dag_processor: {
- $ref: "#/components/schemas/DagProcessorInfoSchema",
+ anyOf: [
+ {
+ $ref: "#/components/schemas/DagProcessorInfoSchema",
+ },
+ {
+ type: "null",
+ },
+ ],
},
},
type: "object",
- required: ["metadatabase", "scheduler", "triggerer", "dag_processor"],
+ required: ["metadatabase", "scheduler", "triggerer"],
title: "HealthInfoSchema",
description: "Schema for the Health endpoint.",
} as const;
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 89d8ab8a145..2035b4579fe 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -705,7 +705,7 @@ export type HealthInfoSchema = {
metadatabase: BaseInfoSchema;
scheduler: SchedulerInfoSchema;
triggerer: TriggererInfoSchema;
- dag_processor: DagProcessorInfoSchema;
+ dag_processor?: DagProcessorInfoSchema | null;
};
/**
diff --git a/airflow/ui/src/pages/Dashboard/Health/Health.tsx
b/airflow/ui/src/pages/Dashboard/Health/Health.tsx
index 0694283930c..040f9a04969 100644
--- a/airflow/ui/src/pages/Dashboard/Health/Health.tsx
+++ b/airflow/ui/src/pages/Dashboard/Health/Health.tsx
@@ -54,15 +54,14 @@ export const Health = () => {
status={data?.triggerer.status}
title="Triggerer"
/>
- {/* TODO: Update this to match the API when we move the config check
to the API level */}
- {data?.dag_processor.status === undefined ? undefined : (
+ {data?.dag_processor ? (
<HealthTag
isLoading={isLoading}
latestHeartbeat={data.dag_processor.latest_dag_processor_heartbeat}
status={data.dag_processor.status}
title="Dag Processor"
/>
- )}
+ ) : undefined}
</HStack>
</Box>
);
diff --git a/tests/api/common/test_airflow_health.py
b/tests/api/common/test_airflow_health.py
index ebdc086c692..06bd1a7bf4c 100644
--- a/tests/api/common/test_airflow_health.py
+++ b/tests/api/common/test_airflow_health.py
@@ -27,14 +27,19 @@ from airflow.api.common.airflow_health import (
get_airflow_health,
)
+from tests_common.test_utils.config import conf_vars
+
pytestmark = pytest.mark.db_test
@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
return_value=None)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_only_metadatabase_healthy(
- latest_scheduler_job_mock, latest_triggerer_job_mock,
latest_dag_processor_job_mock
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
):
health_status = get_airflow_health()
expected_status = {
@@ -50,8 +55,11 @@ def test_get_airflow_health_only_metadatabase_healthy(
@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
return_value=Exception)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=Exception)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=Exception)
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_metadatabase_unhealthy(
- latest_scheduler_job_mock, latest_triggerer_job_mock,
latest_dag_processor_job_mock
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
):
health_status = get_airflow_health()
@@ -65,6 +73,45 @@ def test_get_airflow_health_metadatabase_unhealthy(
assert health_status == expected_status
+@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
return_value=None)
+@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=None)
+@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def
test_get_airflow_health_only_metadatabase_healthy_with_dag_processor_disabled(
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
+):
+ health_status = get_airflow_health()
+ expected_status = {
+ "metadatabase": {"status": HEALTHY},
+ "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
+ "triggerer": {"status": None, "latest_triggerer_heartbeat": None},
+ }
+
+ assert health_status == expected_status
+
+
+@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
return_value=Exception)
+@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=Exception)
+@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=Exception)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def test_get_airflow_health_metadatabase_unhealthy_with_dag_processor_disabled(
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
+):
+ health_status = get_airflow_health()
+
+ expected_status = {
+ "metadatabase": {"status": UNHEALTHY},
+ "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
+ "triggerer": {"status": UNHEALTHY, "latest_triggerer_heartbeat": None},
+ }
+
+ assert health_status == expected_status
+
+
LATEST_SCHEDULER_JOB_MOCK = MagicMock()
LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat = datetime.now()
LATEST_SCHEDULER_JOB_MOCK.is_alive = MagicMock(return_value=True)
@@ -76,8 +123,11 @@ LATEST_SCHEDULER_JOB_MOCK.is_alive =
MagicMock(return_value=True)
)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_scheduler_healthy_no_triggerer(
- latest_scheduler_job_mock, latest_triggerer_job_mock,
latest_dag_processor_job_mock
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
):
health_status = get_airflow_health()
@@ -94,6 +144,32 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer(
assert health_status == expected_status
+@patch(
+ "airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
+ return_value=LATEST_SCHEDULER_JOB_MOCK,
+)
+@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=None)
+@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=None)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def
test_get_airflow_health_scheduler_healthy_no_triggerer__with_dag_processor_disabled(
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
+):
+ health_status = get_airflow_health()
+
+ expected_status = {
+ "metadatabase": {"status": HEALTHY},
+ "scheduler": {
+ "status": HEALTHY,
+ "latest_scheduler_heartbeat":
LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat.isoformat(),
+ },
+ "triggerer": {"status": None, "latest_triggerer_heartbeat": None},
+ }
+
+ assert health_status == expected_status
+
+
LATEST_TRIGGERER_JOB_MOCK = MagicMock()
LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat = datetime.now()
LATEST_TRIGGERER_JOB_MOCK.is_alive = MagicMock(return_value=True)
@@ -112,8 +188,11 @@ LATEST_DAG_PROCESSOR_JOB_MOCK.is_alive =
MagicMock(return_value=True)
"airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
)
+@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
- latest_scheduler_job_mock, latest_triggerer_job_mock,
latest_dag_processor_job_mock
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
):
health_status = get_airflow_health()
@@ -131,3 +210,32 @@ def
test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
}
assert health_status == expected_status
+
+
+@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
return_value=None)
+@patch(
+ "airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
+ return_value=LATEST_TRIGGERER_JOB_MOCK,
+)
+@patch(
+ "airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
+ return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
+)
+@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
+def
test_get_airflow_health_triggerer_healthy_no_scheduler_job_record_with_dag_processor_disabled(
+ latest_scheduler_job_mock,
+ latest_triggerer_job_mock,
+ latest_dag_processor_job_mock,
+):
+ health_status = get_airflow_health()
+
+ expected_status = {
+ "metadatabase": {"status": HEALTHY},
+ "scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
+ "triggerer": {
+ "status": HEALTHY,
+ "latest_triggerer_heartbeat":
LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat.isoformat(),
+ },
+ }
+
+ assert health_status == expected_status
diff --git a/tests/api_fastapi/core_api/routes/public/test_monitor.py
b/tests/api_fastapi/core_api/routes/public/test_monitor.py
index d736291180a..d34031f8be9 100644
--- a/tests/api_fastapi/core_api/routes/public/test_monitor.py
+++ b/tests/api_fastapi/core_api/routes/public/test_monitor.py
@@ -103,3 +103,55 @@ class TestGetHealth(TestMonitorEndpoint):
assert body["metadatabase"]["status"] == "unhealthy"
assert body["scheduler"]["latest_scheduler_heartbeat"] is None
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
+ def test_health_with_dag_processor(self, mock_get_airflow_health,
test_client):
+ mock_get_airflow_health.return_value = {
+ "metadatabase": {"status": HEALTHY},
+ "scheduler": {
+ "status": HEALTHY,
+ "latest_scheduler_heartbeat":
"2024-11-23T11:09:16.663124+00:00",
+ },
+ "triggerer": {
+ "status": HEALTHY,
+ "latest_triggerer_heartbeat":
"2024-11-23T11:09:15.815483+00:00",
+ },
+ "dag_processor": {
+ "status": HEALTHY,
+ "latest_dag_processor_heartbeat":
"2024-11-23T11:09:15.815483+00:00",
+ },
+ }
+
+ response = test_client.get("/public/monitor/health")
+
+ assert response.status_code == 200
+ body = response.json()
+
+ assert "dag_processor" in body
+ assert body["metadatabase"]["status"] == HEALTHY
+ assert body["scheduler"]["status"] == HEALTHY
+ assert body["triggerer"]["status"] == HEALTHY
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
+ def test_health_without_dag_processor(self, mock_get_airflow_health,
test_client):
+ mock_get_airflow_health.return_value = {
+ "metadatabase": {"status": HEALTHY},
+ "scheduler": {
+ "status": HEALTHY,
+ "latest_scheduler_heartbeat":
"2024-11-23T11:09:16.663124+00:00",
+ },
+ "triggerer": {
+ "status": HEALTHY,
+ "latest_triggerer_heartbeat":
"2024-11-23T11:09:15.815483+00:00",
+ },
+ }
+
+ response = test_client.get("/public/monitor/health")
+
+ assert response.status_code == 200
+ body = response.json()
+
+ assert "dag_processor" not in body
+ assert body["metadatabase"]["status"] == HEALTHY
+ assert body["scheduler"]["status"] == HEALTHY
+ assert body["triggerer"]["status"] == HEALTHY