This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cffbb58bb4c Fix Breeze K8s: Patch ConfigMap [api/base_url] with
Port-Forwarded Host Port (#47544)
cffbb58bb4c is described below
commit cffbb58bb4c2fe45d9a325439b99abfffd17a580
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Sun Mar 9 16:18:48 2025 +0800
Fix Breeze K8s: Patch ConfigMap [api/base_url] with Port-Forwarded Host
Port (#47544)
---
kubernetes_tests/test_base.py | 61 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 61 insertions(+)
diff --git a/kubernetes_tests/test_base.py b/kubernetes_tests/test_base.py
index fae5ae6eb30..31e1924c18a 100644
--- a/kubernetes_tests/test_base.py
+++ b/kubernetes_tests/test_base.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import json
import os
import re
import subprocess
@@ -58,6 +59,9 @@ class BaseK8STest:
@pytest.fixture(autouse=True)
def base_tests_setup(self, request):
+ self.set_api_server_base_url_config()
+ self.rollout_restart_deployment("airflow-api-server")
+ self.ensure_deployment_health("airflow-api-server")
# Replacement for unittests.TestCase.id()
self.test_id = f"{request.node.cls.__name__}_{request.node.name}"
self.session = self._get_session_with_retries()
@@ -204,6 +208,63 @@ class BaseK8STest:
).decode()
assert "successfully rolled out" in deployment_rollout_status
+ @staticmethod
+ def rollout_restart_deployment(deployment_name: str, namespace: str =
"airflow"):
+ """Rollout restart the deployment."""
+ check_call(["kubectl", "rollout", "restart", "deployment",
deployment_name, "-n", namespace])
+
+ def _parse_airflow_cfg_as_dict(self, airflow_cfg: str) -> dict[str,
dict[str, str]]:
+ """Parse the airflow.cfg file as a dictionary."""
+ parsed_airflow_cfg: dict[str, dict[str, str]] = {}
+ for line in airflow_cfg.splitlines():
+ if line.startswith("["):
+ section = line[1:-1]
+ parsed_airflow_cfg[section] = {}
+ elif "=" in line:
+ key, value = line.split("=", 1)
+ parsed_airflow_cfg[section][key.strip()] = value.strip()
+ return parsed_airflow_cfg
+
+ def _parse_airflow_cfg_dict_as_escaped_toml(self, airflow_cfg_dict: dict)
-> str:
+ """Parse the airflow.cfg dictionary as a toml string."""
+ airflow_cfg_str = ""
+ for section, section_dict in airflow_cfg_dict.items():
+ airflow_cfg_str += f"[{section}]\n"
+ for key, value in section_dict.items():
+ airflow_cfg_str += f"{key} = {value}\n"
+ airflow_cfg_str += "\n"
+ # escape newlines and double quotes
+ return airflow_cfg_str.replace("\n", "\\n").replace('"', '\\"')
+
+ def set_api_server_base_url_config(self):
+ """Set [api/base_url] with `f"http://{KUBERNETES_HOST_PORT}"` as env
in k8s configmap."""
+ configmap_name = "airflow-config"
+ configmap_key = "airflow.cfg"
+ original_configmap_json_str = check_output(
+ ["kubectl", "get", "configmap", configmap_name, "-n", "airflow",
"-o", "json"]
+ ).decode()
+ original_config_map = json.loads(original_configmap_json_str)
+ original_airflow_cfg = original_config_map["data"][configmap_key]
+ # set [api/base_url] with `f"http://{KUBERNETES_HOST_PORT}"` in
airflow.cfg
+ # The airflow.cfg is toml format, so we need to convert it to json
+ airflow_cfg_dict =
self._parse_airflow_cfg_as_dict(original_airflow_cfg)
+ airflow_cfg_dict["api"]["base_url"] = f"http://{KUBERNETES_HOST_PORT}"
+ # update the configmap with the new airflow.cfg
+ check_call(
+ [
+ "kubectl",
+ "patch",
+ "configmap",
+ configmap_name,
+ "-n",
+ "airflow",
+ "--type",
+ "merge",
+ "-p",
+ f'{{"data": {{"{configmap_key}":
"{self._parse_airflow_cfg_dict_as_escaped_toml(airflow_cfg_dict)}"}}}}',
+ ]
+ )
+
def ensure_dag_expected_state(self, host, logical_date, dag_id,
expected_final_state, timeout):
tries = 0
state = ""