This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a1f5a5425e Remove deprecated features from KubernetesHook (#31402)
a1f5a5425e is described below
commit a1f5a5425e65c40e9baaf5eb4faeaed01cee3569
Author: Daniel Standish <[email protected]>
AuthorDate: Fri May 19 04:15:23 2023 -0700
Remove deprecated features from KubernetesHook (#31402)
* Remove deprecated features from KubernetesHook
---
.../providers/cncf/kubernetes/hooks/kubernetes.py | 58 +++-------------
airflow/providers/cncf/kubernetes/operators/pod.py | 36 +---------
.../providers/cncf/kubernetes/utils/pod_manager.py | 14 +---
kubernetes_tests/test_kubernetes_pod_operator.py | 2 -
.../cncf/kubernetes/decorators/test_kubernetes.py | 12 +---
.../cncf/kubernetes/hooks/test_kubernetes.py | 48 +-------------
.../cncf/kubernetes/operators/test_pod.py | 77 ++++++++++------------
7 files changed, 51 insertions(+), 196 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 8057593071..bdd0cbc02c 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -17,9 +17,7 @@
from __future__ import annotations
import contextlib
-import json
import tempfile
-import warnings
from typing import TYPE_CHECKING, Any, Generator
from asgiref.sync import sync_to_async
@@ -30,7 +28,7 @@ from kubernetes_asyncio import client as async_client, config
as async_config
from urllib3.exceptions import HTTPError
from airflow.compat.functools import cached_property
-from airflow.exceptions import AirflowException, AirflowNotFoundException,
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.kubernetes.kube_client import _disable_verify_ssl,
_enable_tcp_keepalive
from airflow.models import Connection
@@ -101,12 +99,6 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
"cluster_context": StringField(lazy_gettext("Cluster context"),
widget=BS3TextFieldWidget()),
"disable_verify_ssl": BooleanField(lazy_gettext("Disable SSL")),
"disable_tcp_keepalive": BooleanField(lazy_gettext("Disable TCP
keepalive")),
- "xcom_sidecar_container_image": StringField(
- lazy_gettext("XCom sidecar image"), widget=BS3TextFieldWidget()
- ),
- "xcom_sidecar_container_resources": StringField(
- lazy_gettext("XCom sidecar resources (JSON format)"),
widget=BS3TextFieldWidget()
- ),
}
@staticmethod
@@ -303,7 +295,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
response = api.create_namespaced_custom_object(
group=group,
version=version,
- namespace=namespace or self.get_namespace(),
+ namespace=namespace or self.get_namespace() or
self.DEFAULT_NAMESPACE,
plural=plural,
body=body_dict,
)
@@ -327,7 +319,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
response = api.get_namespaced_custom_object(
group=group,
version=version,
- namespace=namespace or self.get_namespace(),
+ namespace=namespace or self.get_namespace() or
self.DEFAULT_NAMESPACE,
plural=plural,
name=name,
)
@@ -349,52 +341,18 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
return api.delete_namespaced_custom_object(
group=group,
version=version,
- namespace=namespace or self.get_namespace(),
+ namespace=namespace or self.get_namespace() or
self.DEFAULT_NAMESPACE,
plural=plural,
name=name,
**kwargs,
)
def get_namespace(self) -> str | None:
- """
- Returns the namespace defined in the connection or 'default'.
-
- TODO: in provider version 6.0, return None when namespace not defined
in connection
- """
- namespace = self._get_namespace()
- if self.conn_id and not namespace:
- warnings.warn(
- "Airflow connection defined but namespace is not set;
returning 'default'. In "
- "cncf.kubernetes provider version 6.0 we will return None when
namespace is "
- "not defined in the connection so that it's clear whether user
intends 'default' or "
- "whether namespace is unset (which is required in order to
apply precedence logic in "
- "KubernetesPodOperator).",
- AirflowProviderDeprecationWarning,
- )
- return "default"
- return namespace
-
- def _get_namespace(self) -> str | None:
- """
- Returns the namespace that defined in the connection
-
- TODO: in provider version 6.0, get rid of this method and make it the
behavior of get_namespace.
- """
+ """Returns the namespace that defined in the connection"""
if self.conn_id:
return self._get_field("namespace")
return None
- def get_xcom_sidecar_container_image(self):
- """Returns the xcom sidecar image that defined in the connection"""
- return self._get_field("xcom_sidecar_container_image")
-
- def get_xcom_sidecar_container_resources(self):
- """Returns the xcom sidecar resources that defined in the connection"""
- field = self._get_field("xcom_sidecar_container_resources")
- if not field:
- return None
- return json.loads(field)
-
def get_pod_log_stream(
self,
pod_name: str,
@@ -415,7 +373,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
self.core_v1_client.read_namespaced_pod_log,
name=pod_name,
container=container,
- namespace=namespace or self._get_namespace() or
self.DEFAULT_NAMESPACE,
+ namespace=namespace or self.get_namespace() or
self.DEFAULT_NAMESPACE,
),
)
@@ -436,7 +394,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
name=pod_name,
container=container,
_preload_content=False,
- namespace=namespace or self._get_namespace() or
self.DEFAULT_NAMESPACE,
+ namespace=namespace or self.get_namespace() or
self.DEFAULT_NAMESPACE,
)
def get_pod(self, name: str, namespace: str) -> V1Pod:
@@ -460,7 +418,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
:param watch: Watch for changes to the described resources and return
them as a stream
"""
return self.core_v1_client.list_namespaced_pod(
- namespace=namespace or self._get_namespace() or
self.DEFAULT_NAMESPACE,
+ namespace=namespace or self.get_namespace() or
self.DEFAULT_NAMESPACE,
watch=watch,
label_selector=label_selector,
_preload_content=False,
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index e3f363e7c2..d30794b066 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -20,11 +20,9 @@ from __future__ import annotations
import json
import logging
-import os
import re
import secrets
import string
-import warnings
from collections.abc import Container
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING, Any, Sequence
@@ -34,7 +32,7 @@ from slugify import slugify
from urllib3.exceptions import HTTPError
from airflow.compat.functools import cached_property
-from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
+from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
@@ -473,14 +471,6 @@ class KubernetesPodOperator(BaseOperator):
)
return hook
- def get_hook(self):
- warnings.warn(
- "get_hook is deprecated. Please use hook instead.",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- return self.hook
-
@cached_property
def client(self) -> CoreV1Api:
return self.hook.core_v1_client
@@ -589,20 +579,6 @@ class KubernetesPodOperator(BaseOperator):
)
self.invoke_defer_method()
- def convert_config_file_to_dict(self):
- """Converts passed config_file to dict format."""
- warnings.warn(
- "This method is deprecated and will be removed in a future
version.",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- config_file = self.config_file if self.config_file else
os.environ.get(KUBE_CONFIG_ENV_VAR)
- if config_file:
- with open(config_file) as f:
- self._config_dict = yaml.safe_load(f)
- else:
- self._config_dict = None
-
def invoke_defer_method(self):
"""Method to easily redefine triggers which are being used in child
classes."""
trigger_start_time = utcnow()
@@ -851,9 +827,7 @@ class KubernetesPodOperator(BaseOperator):
pod.metadata.name = _add_pod_suffix(pod_name=pod.metadata.name)
if not pod.metadata.namespace:
- # todo: replace with call to `hook.get_namespace` in 6.0, when it
doesn't default to `default`.
- # if namespace not actually defined in hook, we want to check k8s
if in cluster
- hook_namespace = self.hook._get_namespace()
+ hook_namespace = self.hook.get_namespace()
pod_namespace = self.namespace or hook_namespace or
self._incluster_namespace or "default"
pod.metadata.namespace = pod_namespace
@@ -862,11 +836,7 @@ class KubernetesPodOperator(BaseOperator):
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
- pod = xcom_sidecar.add_xcom_sidecar(
- pod,
-
sidecar_container_image=self.hook.get_xcom_sidecar_container_image(),
-
sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(),
- )
+ pod = xcom_sidecar.add_xcom_sidecar(pod)
labels = self._get_ti_pod_labels(context)
self.log.info("Building pod %s with labels: %s", pod.metadata.name,
labels)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index bd431e078b..daa19c2c2d 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -93,18 +93,8 @@ class PodOperatorHookProtocol(Protocol):
def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Read pod object from kubernetes API."""
- def _get_namespace(self) -> str | None:
- """
- Returns the namespace that defined in the connection
-
- TODO: in provider version 6.0, get rid of this method and make it the
behavior of get_namespace.
- """
-
- def get_xcom_sidecar_container_image(self) -> str | None:
- """Returns the xcom sidecar image that defined in the connection"""
-
- def get_xcom_sidecar_container_resources(self) -> str | None:
- """Returns the xcom sidecar resources that defined in the connection"""
+ def get_namespace(self) -> str | None:
+ """Returns the namespace that defined in the connection"""
def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus
| None:
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 4afd4507fd..e650932b53 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -896,8 +896,6 @@ class TestKubernetesPodOperatorSystem:
# todo: This isn't really a system test
await_xcom_sidecar_container_start_mock.return_value = None
hook_mock.return_value.is_in_cluster = False
- hook_mock.return_value.get_xcom_sidecar_container_image.return_value =
None
-
hook_mock.return_value.get_xcom_sidecar_container_resources.return_value = None
hook_mock.return_value.get_connection.return_value =
Connection(conn_id="kubernetes_default")
extract_xcom_mock.return_value = "{}"
path = sys.path[0] + "/tests/kubernetes/pod.yaml"
diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
index b7f79279d4..3559196243 100644
--- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py
@@ -31,8 +31,6 @@ KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.pod"
POD_MANAGER_CLASS =
"airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
-XCOM_IMAGE = "XCOM_IMAGE"
-
@pytest.fixture(autouse=True)
def mock_create_pod() -> mock.Mock:
@@ -127,12 +125,6 @@ def test_kubernetes_with_input_output(
dr = dag_maker.create_dagrun()
(ti,) = dr.task_instances
- mock_hook.return_value.get_xcom_sidecar_container_image.return_value =
XCOM_IMAGE
- mock_hook.return_value.get_xcom_sidecar_container_resources.return_value =
{
- "requests": {"cpu": "1m", "memory": "10Mi"},
- "limits": {"cpu": "1m", "memory": "50Mi"},
- }
-
dag.get_task("my_task_id").execute(context=ti.get_template_context(session=session))
mock_hook.assert_called_once_with(
@@ -142,8 +134,6 @@ def test_kubernetes_with_input_output(
config_file="/tmp/fake_file",
)
assert mock_create_pod.call_count == 1
- assert mock_hook.return_value.get_xcom_sidecar_container_image.call_count
== 1
- assert
mock_hook.return_value.get_xcom_sidecar_container_resources.call_count == 1
containers = mock_create_pod.call_args[1]["pod"].spec.containers
@@ -162,7 +152,7 @@ def test_kubernetes_with_input_output(
assert decoded_input == {"args": ("arg1", "arg2"), "kwargs": {"kwarg1":
"kwarg1"}}
# Second container is xcom image
- assert containers[1].image == XCOM_IMAGE
+ assert containers[1].image == "alpine"
assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom"
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index 63740af2e4..6e1b9615e1 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -93,20 +93,6 @@ class TestKubernetesHook:
("disable_verify_ssl_empty", {"disable_verify_ssl": ""}),
("disable_tcp_keepalive", {"disable_tcp_keepalive": True}),
("disable_tcp_keepalive_empty", {"disable_tcp_keepalive": ""}),
- ("sidecar_container_image", {"xcom_sidecar_container_image":
"private.repo.com/alpine:3.16"}),
- ("sidecar_container_image_empty", {"xcom_sidecar_container_image":
""}),
- (
- "sidecar_container_resources",
- {
- "xcom_sidecar_container_resources": json.dumps(
- {
- "requests": {"cpu": "1m", "memory": "10Mi"},
- "limits": {"cpu": "1m", "memory": "50Mi"},
- }
- ),
- },
- ),
- ("sidecar_container_resources_empty",
{"xcom_sidecar_container_resources": ""}),
]:
db.merge_conn(Connection(conn_type="kubernetes", conn_id=conn_id,
extra=json.dumps(extra)))
@@ -348,7 +334,7 @@ class TestKubernetesHook:
(
pytest.param(None, None, id="no-conn-id"),
pytest.param("with_namespace", "mock_namespace",
id="conn-with-namespace"),
- pytest.param("default_kube_config", "default",
id="conn-without-namespace"),
+ pytest.param("default_kube_config", None,
id="conn-without-namespace"),
),
)
def test_get_namespace(self, conn_id, expected):
@@ -361,38 +347,6 @@ class TestKubernetesHook:
"and rename _get_namespace to get_namespace."
)
- @pytest.mark.parametrize(
- "conn_id, expected",
- (
- pytest.param("sidecar_container_image",
"private.repo.com/alpine:3.16", id="sidecar-with-image"),
- pytest.param("sidecar_container_image_empty", None,
id="sidecar-without-image"),
- ),
- )
- def test_get_xcom_sidecar_container_image(self, conn_id, expected):
- hook = KubernetesHook(conn_id=conn_id)
- assert hook.get_xcom_sidecar_container_image() == expected
-
- @pytest.mark.parametrize(
- "conn_id, expected",
- (
- pytest.param(
- "sidecar_container_resources",
- {
- "requests": {"cpu": "1m", "memory": "10Mi"},
- "limits": {
- "cpu": "1m",
- "memory": "50Mi",
- },
- },
- id="sidecar-with-resources",
- ),
- pytest.param("sidecar_container_resources_empty", None,
id="sidecar-without-resources"),
- ),
- )
- def test_get_xcom_sidecar_container_resources(self, conn_id, expected):
- hook = KubernetesHook(conn_id=conn_id)
- assert hook.get_xcom_sidecar_container_resources() == expected
-
@patch("kubernetes.config.kube_config.KubeConfigLoader")
@patch("kubernetes.config.kube_config.KubeConfigMerger")
def test_client_types(self, mock_kube_config_merger,
mock_kube_config_loader):
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 0ee1699a1c..697044342c 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -17,7 +17,7 @@
from __future__ import annotations
import re
-from contextlib import nullcontext
+from contextlib import contextmanager, nullcontext
from unittest import mock
from unittest.mock import MagicMock, patch
@@ -32,11 +32,9 @@ from airflow.exceptions import AirflowException,
AirflowSkipException, TaskDefer
from airflow.kubernetes.secret import Secret
from airflow.models import DAG, DagModel, DagRun, TaskInstance
from airflow.models.xcom import XCom
-from airflow.providers.cncf.kubernetes.operators.pod import (
- KubernetesPodOperator,
- _optionally_suppress,
-)
+from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator, _optionally_suppress
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
+from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
@@ -59,6 +57,14 @@ TEST_NAME = "test-pod"
TEST_SUCCESS_MESSAGE = "All containers inside pod have started successfully."
+@contextmanager
+def temp_override_attr(obj, attr, val):
+ orig = getattr(obj, attr)
+ setattr(obj, attr, val)
+ yield
+ setattr(obj, attr, orig)
+
+
@pytest.fixture(scope="function", autouse=True)
def clear_db():
db.clear_db_dags()
@@ -526,31 +532,27 @@ class TestKubernetesPodOperator:
)
mock_find.assert_called_once_with("default", context=context)
- @patch(HOOK_CLASS)
- def test_xcom_sidecar_container_image_default(self, hook_mock):
- hook_mock.return_value.get_xcom_sidecar_container_image.return_value =
None
- k = KubernetesPodOperator(
- name="test",
- task_id="task",
- do_xcom_push=True,
- )
- pod = k.build_pod_request_obj(create_context(k))
- assert pod.spec.containers[1].image == "alpine"
+ def test_xcom_sidecar_container_image_custom(self):
+ image = "private.repo/alpine:3.13"
+ with temp_override_attr(PodDefaults.SIDECAR_CONTAINER, "image", image):
+ k = KubernetesPodOperator(
+ name="test",
+ task_id="task",
+ do_xcom_push=True,
+ )
+ pod = k.build_pod_request_obj(create_context(k))
+ assert pod.spec.containers[1].image == image
- @patch(HOOK_CLASS)
- def test_xcom_sidecar_container_image_custom(self, hook_mock):
- hook_mock.return_value.get_xcom_sidecar_container_image.return_value =
"private.repo/alpine:3.13"
+ def test_xcom_sidecar_container_image_default(self):
k = KubernetesPodOperator(
name="test",
task_id="task",
do_xcom_push=True,
)
pod = k.build_pod_request_obj(create_context(k))
- assert pod.spec.containers[1].image == "private.repo/alpine:3.13"
+ assert pod.spec.containers[1].image == "alpine"
- @patch(HOOK_CLASS)
- def test_xcom_sidecar_container_resources_default(self, hook_mock):
-
hook_mock.return_value.get_xcom_sidecar_container_resources.return_value = None
+ def test_xcom_sidecar_container_resources_default(self):
k = KubernetesPodOperator(
name="test",
task_id="task",
@@ -564,22 +566,19 @@ class TestKubernetesPodOperator:
},
)
- @patch(HOOK_CLASS)
- def test_xcom_sidecar_container_resources_custom(self, hook_mock):
-
hook_mock.return_value.get_xcom_sidecar_container_resources.return_value = {
- "requests": {"cpu": "1m", "memory": "10Mi"},
- "limits": {"cpu": "10m", "memory": "50Mi"},
- }
- k = KubernetesPodOperator(
- name="test",
- task_id="task",
- do_xcom_push=True,
- )
- pod = k.build_pod_request_obj(create_context(k))
- assert pod.spec.containers[1].resources == {
+ def test_xcom_sidecar_container_resources_custom(self):
+ resources = {
"requests": {"cpu": "1m", "memory": "10Mi"},
"limits": {"cpu": "10m", "memory": "50Mi"},
}
+ with temp_override_attr(PodDefaults.SIDECAR_CONTAINER, "resources",
resources):
+ k = KubernetesPodOperator(
+ name="test",
+ task_id="task",
+ do_xcom_push=True,
+ )
+ pod = k.build_pod_request_obj(create_context(k))
+ assert pod.spec.containers[1].resources == resources
def test_image_pull_policy_correctly_set(self):
k = KubernetesPodOperator(
@@ -1422,9 +1421,7 @@ class TestKubernetesPodOperatorAsync:
else:
mocked_extract.assert_not_called()
- @patch(HOOK_CLASS)
- def
test_async_xcom_sidecar_container_image_default_should_execute_successfully(self,
hook_mock):
- hook_mock.return_value.get_xcom_sidecar_container_image.return_value =
None
+ def
test_async_xcom_sidecar_container_image_default_should_execute_successfully(self):
k = KubernetesPodOperator(
name=TEST_NAME,
task_id="task",
@@ -1434,9 +1431,7 @@ class TestKubernetesPodOperatorAsync:
pod = k.build_pod_request_obj(create_context(k))
assert pod.spec.containers[1].image == "alpine"
- @patch(HOOK_CLASS)
- def
test_async_xcom_sidecar_container_resources_default_should_execute_successfully(self,
hook_mock):
-
hook_mock.return_value.get_xcom_sidecar_container_resources.return_value = None
+ def
test_async_xcom_sidecar_container_resources_default_should_execute_successfully(self):
k = KubernetesPodOperator(
name=TEST_NAME,
task_id="task",