This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e729a2e8cb221b0f678c16d86fe7a249e824a544
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Apr 24 20:51:40 2025 +0200
Fix k8s test: scheduler crash when using LocalExecutor (#49677) (#49718)
(cherry picked from commit 2372dcd07cac10d33a4a991345d2694a365b826a)
Co-authored-by: LIU ZHE YOU <[email protected]>
---
.../tests/kubernetes_tests/test_base.py | 25 ++++++++++++++++------
.../kubernetes_tests/test_kubernetes_executor.py | 2 +-
.../tests/kubernetes_tests/test_other_executors.py | 18 ++++++++--------
3 files changed, 29 insertions(+), 16 deletions(-)
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index d453d94e266..44fdd3b29e2 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -24,6 +24,7 @@ import time
from datetime import datetime, timezone
from pathlib import Path
from subprocess import check_call, check_output
+from typing import Literal
import pytest
import requests
@@ -65,7 +66,7 @@ class BaseK8STest:
# Replacement for unittests.TestCase.id()
self.test_id = f"{request.node.cls.__name__}_{request.node.name}"
# Ensure the api-server deployment is healthy at kubernetes level
before calling the any API
- self.ensure_deployment_health("airflow-api-server")
+ self.ensure_resource_health("airflow-api-server")
try:
self.session = self._get_session_with_retries()
self._ensure_airflow_api_server_is_healthy()
@@ -227,12 +228,24 @@ class BaseK8STest:
assert state == expected_final_state
@staticmethod
- def ensure_deployment_health(deployment_name: str, namespace: str =
"airflow"):
- """Watch the deployment until it is healthy."""
- deployment_rollout_status = check_output(
- ["kubectl", "rollout", "status", "deployment", deployment_name,
"-n", namespace, "--watch"]
+ def ensure_resource_health(
+ resource_name: str,
+ namespace: str = "airflow",
+ resource_type: Literal["deployment", "statefulset"] = "deployment",
+ ):
+ """Watch the resource until it is healthy.
+ Args:
+ resource_name (str): Name of the resource to check.
+ resource_type (str): Type of the resource (e.g., deployment,
statefulset).
+ namespace (str): Kubernetes namespace where the resource is
located.
+ """
+ rollout_status = check_output(
+ ["kubectl", "rollout", "status",
f"{resource_type}/{resource_name}", "-n", namespace, "--watch"],
).decode()
- assert "successfully rolled out" in deployment_rollout_status
+ if resource_type == "deployment":
+ assert "successfully rolled out" in rollout_status
+ else:
+ assert "roll out complete" in rollout_status
def ensure_dag_expected_state(self, host, logical_date, dag_id,
expected_final_state, timeout):
tries = 0
diff --git
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
index b3898f46027..c7e1322bc77 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
@@ -81,7 +81,7 @@ class TestKubernetesExecutor(BaseK8STest):
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id,
self.host)
self._delete_airflow_pod("scheduler")
- self.ensure_deployment_health("airflow-scheduler")
+ self.ensure_resource_health("airflow-scheduler")
# Wait some time for the operator to complete
self.monitor_task(
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
index 327e252825a..7b49bf9f563 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
@@ -29,10 +29,6 @@ from kubernetes_tests.test_base import (
# Also, the skipping is necessary as there's no gain in running these tests in
KubernetesExecutor
@pytest.mark.skipif(EXECUTOR == "KubernetesExecutor", reason="Does not run on
KubernetesExecutor")
class TestCeleryAndLocalExecutor(BaseK8STest):
- @pytest.mark.xfail(
- EXECUTOR == "LocalExecutor",
- reason="https://github.com/apache/airflow/issues/47518 needs to be
fixed",
- )
def test_integration_run_dag(self):
dag_id = "example_bash_operator"
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id,
self.host)
@@ -56,17 +52,21 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
timeout=300,
)
- @pytest.mark.xfail(
- EXECUTOR == "LocalExecutor",
- reason="https://github.com/apache/airflow/issues/47518 needs to be
fixed",
- )
def test_integration_run_dag_with_scheduler_failure(self):
dag_id = "example_xcom"
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id,
self.host)
self._delete_airflow_pod("scheduler")
- self.ensure_deployment_health("airflow-scheduler")
+
+ # Wait for the scheduler to be recreated
+ if EXECUTOR == "CeleryExecutor":
+ scheduler_resource_type = "deployment"
+ elif EXECUTOR == "LocalExecutor":
+ scheduler_resource_type = "statefulset"
+ else:
+ raise ValueError(f"Unknown executor {EXECUTOR}")
+ self.ensure_resource_health("airflow-scheduler",
resource_type=scheduler_resource_type)
# Wait some time for the operator to complete
self.monitor_task(