This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e729a2e8cb221b0f678c16d86fe7a249e824a544
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Apr 24 20:51:40 2025 +0200

    Fix k8s test: scheduler crash when using LocalExecutor (#49677) (#49718)
    
    (cherry picked from commit 2372dcd07cac10d33a4a991345d2694a365b826a)
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
---
 .../tests/kubernetes_tests/test_base.py            | 25 ++++++++++++++++------
 .../kubernetes_tests/test_kubernetes_executor.py   |  2 +-
 .../tests/kubernetes_tests/test_other_executors.py | 18 ++++++++--------
 3 files changed, 29 insertions(+), 16 deletions(-)

diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py 
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index d453d94e266..44fdd3b29e2 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -24,6 +24,7 @@ import time
 from datetime import datetime, timezone
 from pathlib import Path
 from subprocess import check_call, check_output
+from typing import Literal
 
 import pytest
 import requests
@@ -65,7 +66,7 @@ class BaseK8STest:
         # Replacement for unittests.TestCase.id()
         self.test_id = f"{request.node.cls.__name__}_{request.node.name}"
         # Ensure the api-server deployment is healthy at kubernetes level 
before calling the any API
-        self.ensure_deployment_health("airflow-api-server")
+        self.ensure_resource_health("airflow-api-server")
         try:
             self.session = self._get_session_with_retries()
             self._ensure_airflow_api_server_is_healthy()
@@ -227,12 +228,24 @@ class BaseK8STest:
         assert state == expected_final_state
 
     @staticmethod
-    def ensure_deployment_health(deployment_name: str, namespace: str = 
"airflow"):
-        """Watch the deployment until it is healthy."""
-        deployment_rollout_status = check_output(
-            ["kubectl", "rollout", "status", "deployment", deployment_name, 
"-n", namespace, "--watch"]
+    def ensure_resource_health(
+        resource_name: str,
+        namespace: str = "airflow",
+        resource_type: Literal["deployment", "statefulset"] = "deployment",
+    ):
+        """Watch the resource until it is healthy.
+        Args:
+            resource_name (str): Name of the resource to check.
+            resource_type (str): Type of the resource (e.g., deployment, 
statefulset).
+            namespace (str): Kubernetes namespace where the resource is 
located.
+        """
+        rollout_status = check_output(
+            ["kubectl", "rollout", "status", 
f"{resource_type}/{resource_name}", "-n", namespace, "--watch"],
         ).decode()
-        assert "successfully rolled out" in deployment_rollout_status
+        if resource_type == "deployment":
+            assert "successfully rolled out" in rollout_status
+        else:
+            assert "roll out complete" in rollout_status
 
     def ensure_dag_expected_state(self, host, logical_date, dag_id, 
expected_final_state, timeout):
         tries = 0
diff --git 
a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py 
b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
index b3898f46027..c7e1322bc77 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
@@ -81,7 +81,7 @@ class TestKubernetesExecutor(BaseK8STest):
         dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, 
self.host)
 
         self._delete_airflow_pod("scheduler")
-        self.ensure_deployment_health("airflow-scheduler")
+        self.ensure_resource_health("airflow-scheduler")
 
         # Wait some time for the operator to complete
         self.monitor_task(
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py 
b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
index 327e252825a..7b49bf9f563 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
@@ -29,10 +29,6 @@ from kubernetes_tests.test_base import (
 # Also, the skipping is necessary as there's no gain in running these tests in 
KubernetesExecutor
 @pytest.mark.skipif(EXECUTOR == "KubernetesExecutor", reason="Does not run on 
KubernetesExecutor")
 class TestCeleryAndLocalExecutor(BaseK8STest):
-    @pytest.mark.xfail(
-        EXECUTOR == "LocalExecutor",
-        reason="https://github.com/apache/airflow/issues/47518 needs to be 
fixed",
-    )
     def test_integration_run_dag(self):
         dag_id = "example_bash_operator"
         dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, 
self.host)
@@ -56,17 +52,21 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
             timeout=300,
         )
 
-    @pytest.mark.xfail(
-        EXECUTOR == "LocalExecutor",
-        reason="https://github.com/apache/airflow/issues/47518 needs to be 
fixed",
-    )
     def test_integration_run_dag_with_scheduler_failure(self):
         dag_id = "example_xcom"
 
         dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, 
self.host)
 
         self._delete_airflow_pod("scheduler")
-        self.ensure_deployment_health("airflow-scheduler")
+
+        # Wait for the scheduler to be recreated
+        if EXECUTOR == "CeleryExecutor":
+            scheduler_resource_type = "deployment"
+        elif EXECUTOR == "LocalExecutor":
+            scheduler_resource_type = "statefulset"
+        else:
+            raise ValueError(f"Unknown executor {EXECUTOR}")
+        self.ensure_resource_health("airflow-scheduler", 
resource_type=scheduler_resource_type)
 
         # Wait some time for the operator to complete
         self.monitor_task(

Reply via email to