This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 27ed4490968f2d740af2d04a90a523405a464914 Author: Kaxil Naik <[email protected]> AuthorDate: Thu Oct 16 11:40:13 2025 +0100 Prevent unnecessary kubernetes client imports in workers (#56692) (cherry picked from commit 4926999cf0f711806075607016b612aa483c87b5) --- .../airflow/serialization/serialized_objects.py | 6 ++++ .../unit/serialization/test_serialized_objects.py | 28 +++++++++++++++ .../secrets_masker/secrets_masker.py | 19 +++++++--- .../tests/secrets_masker/test_secrets_masker.py | 41 ++++++++++++++++++++++ 4 files changed, 89 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index e879654fef1..3a075469d4a 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -28,6 +28,7 @@ import itertools import logging import math import re +import sys import weakref from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence from functools import cached_property, lru_cache @@ -3813,6 +3814,11 @@ def _has_kubernetes() -> bool: if "HAS_KUBERNETES" in globals(): return HAS_KUBERNETES + # Check if kubernetes is already imported before triggering expensive import + if "kubernetes.client" not in sys.modules: + HAS_KUBERNETES = False + return False + # Loading kube modules is expensive, so delay it until the last moment try: diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index a5cc17dc3ed..6dc503ff099 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -19,6 +19,7 @@ from __future__ import annotations import json import math +import sys from collections.abc import Iterator from datetime import datetime, timedelta @@ -74,6 +75,7 @@ from airflow.serialization.serialized_objects import ( BaseSerialization, LazyDeserializedDAG, SerializedDAG, + _has_kubernetes, create_scheduler_operator, ) from airflow.triggers.base import BaseTrigger @@ -718,3 +720,29 @@ class TestSerializedBaseOperator: next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT}, context={}, ) + + +class TestKubernetesImportAvoidance: + """Test that serialization doesn't import kubernetes unnecessarily.""" + + def test_has_kubernetes_no_import_when_not_needed(self): + """Ensure _has_kubernetes() doesn't import k8s when not already loaded.""" + # Remove kubernetes from sys.modules if present + k8s_modules = [m for m in list(sys.modules.keys()) if m.startswith("kubernetes")] + if k8s_modules: + pytest.skip("Kubernetes already imported, cannot test import avoidance") + + # Call _has_kubernetes() - should check sys.modules and return False without importing + result = _has_kubernetes() + + assert result is False + assert "kubernetes.client" not in sys.modules + + def test_has_kubernetes_uses_existing_import(self): + """Ensure _has_kubernetes() uses k8s when it's already imported.""" + pytest.importorskip("kubernetes") + + # Now k8s is imported, should return True + result = _has_kubernetes() + + assert result is True diff --git a/shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py b/shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py index 5709f85ec5c..5b7ff4e1c81 100644 --- a/shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py +++ b/shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py @@ -158,20 +158,29 @@ def reset_secrets_masker() -> None: _secrets_masker().reset_masker() +def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]: + """Check if object is V1EnvVar, avoiding unnecessary imports.""" + # Quick check: if k8s not imported, can't be a V1EnvVar instance + if "kubernetes.client" not in sys.modules: + return False + + # K8s is loaded, safe to get/cache the type + v1_type = _get_v1_env_var_type_cached() + return isinstance(v, v1_type) + + @cache -def _get_v1_env_var_type() -> type: +def _get_v1_env_var_type_cached() -> type: + """Get V1EnvVar type (cached, only called when k8s is already loaded).""" try: from kubernetes.client import V1EnvVar return V1EnvVar except ImportError: + # Shouldn't happen since we check sys.modules first return type("V1EnvVar", (), {}) -def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]: - return isinstance(v, _get_v1_env_var_type()) - - class SecretsMasker(logging.Filter): """Redact secrets from logs.""" diff --git a/shared/secrets_masker/tests/secrets_masker/test_secrets_masker.py b/shared/secrets_masker/tests/secrets_masker/test_secrets_masker.py index bada3b39281..ef94dc46e02 100644 --- a/shared/secrets_masker/tests/secrets_masker/test_secrets_masker.py +++ b/shared/secrets_masker/tests/secrets_masker/test_secrets_masker.py @@ -1110,3 +1110,44 @@ class TestSecretsMaskerMerge: assert final_dict["api"]["api_key"] == "new_api_key_67890" # User modification kept assert final_dict["api"]["timeout"] == 60 # User modification kept assert final_dict["app_name"] == "my_application" # Unchanged + + +class TestKubernetesImportAvoidance: + """Test that secrets masker doesn't import kubernetes unnecessarily.""" + + def test_no_k8s_import_when_not_needed(self): + """Ensure kubernetes is not imported when masking non-k8s secrets.""" + # Ensure kubernetes is not already imported + k8s_modules = [m for m in sys.modules if m.startswith("kubernetes")] + if k8s_modules: + pytest.skip("Kubernetes already imported, cannot test import avoidance") + + masker = SecretsMasker() + configure_secrets_masker_for_test(masker) + + masker.add_mask("test_secret", "password") + redacted = masker.redact({"password": "test_secret", "user": "admin"}) + + assert redacted["password"] == "***" + assert redacted["user"] == "admin" + + assert "kubernetes.client" not in sys.modules + + def test_k8s_objects_still_detected_when_imported(self): + """Ensure V1EnvVar objects are still properly detected when k8s is imported.""" + pytest.importorskip("kubernetes") + + from kubernetes.client import V1EnvVar + + # Create a V1EnvVar object with a sensitive name + env_var = V1EnvVar(name="password", value="secret123") + + masker = SecretsMasker() + configure_secrets_masker_for_test(masker) + + # Redact the V1EnvVar object - the name field is sensitive + redacted = masker.redact(env_var) + + # Should be redacted since "password" is a sensitive field name + assert redacted["value"] == "***" + assert redacted["name"] == "password"
