This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 701239abc3 Remove deprecated backcompat objects for KPO (#27518)
701239abc3 is described below
commit 701239abc372cb235b1c313198ae2ec429be4f91
Author: Daniel Standish <[email protected]>
AuthorDate: Sat Nov 5 10:43:51 2022 -0700
Remove deprecated backcompat objects for KPO (#27518)
These modules were all deprecated long ago -- actually in 2.0, I believe.
So I think it's fair to remove them.
---
airflow/providers/cncf/kubernetes/CHANGELOG.rst | 7 ++
.../providers/cncf/kubernetes/backcompat/pod.py | 116 ---------------------
.../kubernetes/backcompat/pod_runtime_info_env.py | 57 ----------
.../providers/cncf/kubernetes/backcompat/volume.py | 63 -----------
.../cncf/kubernetes/backcompat/volume_mount.py | 59 -----------
.../test_kubernetes_pod_operator_backcompat.py | 83 +--------------
6 files changed, 11 insertions(+), 374 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 6985a7261f..10b23c7c83 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -36,6 +36,13 @@ Drop support for providing ``resource`` as dict in
``KubernetesPodOperator``. Yo
Param ``node_selectors`` has been removed in ``KubernetesPodOperator``; use
``node_selector`` instead.
+The following backcompat modules for KubernetesPodOperator are removed and you
must now use the corresponding objects from the kubernetes library:
+* ``airflow.providers.cncf.kubernetes.backcompat.pod``
+* ``airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env``
+* ``airflow.providers.cncf.kubernetes.backcompat.volume``
+* ``airflow.providers.cncf.kubernetes.backcompat.volume_mount``
+
+
Features
~~~~~~~~
diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod.py
b/airflow/providers/cncf/kubernetes/backcompat/pod.py
deleted file mode 100644
index 4bd67e8780..0000000000
--- a/airflow/providers/cncf/kubernetes/backcompat/pod.py
+++ /dev/null
@@ -1,116 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use
:mod:`kubernetes.client.models.V1ResourceRequirements`
-and :mod:`kubernetes.client.models.V1ContainerPort`.
-"""
-from __future__ import annotations
-
-import warnings
-
-from kubernetes.client import models as k8s
-
-warnings.warn(
- (
- "This module is deprecated. Please use
`kubernetes.client.models.V1ResourceRequirements`"
- " and `kubernetes.client.models.V1ContainerPort`."
- ),
- DeprecationWarning,
- stacklevel=2,
-)
-
-
-class Resources:
- """backwards compat for Resources."""
-
- __slots__ = (
- "request_memory",
- "request_cpu",
- "limit_memory",
- "limit_cpu",
- "limit_gpu",
- "request_ephemeral_storage",
- "limit_ephemeral_storage",
- )
-
- """
- :param request_memory: requested memory
- :param request_cpu: requested CPU number
- :param request_ephemeral_storage: requested ephemeral storage
- :param limit_memory: limit for memory usage
- :param limit_cpu: Limit for CPU used
- :param limit_gpu: Limits for GPU used
- :param limit_ephemeral_storage: Limit for ephemeral storage
- """
-
- def __init__(
- self,
- request_memory=None,
- request_cpu=None,
- request_ephemeral_storage=None,
- limit_memory=None,
- limit_cpu=None,
- limit_gpu=None,
- limit_ephemeral_storage=None,
- ):
- self.request_memory = request_memory
- self.request_cpu = request_cpu
- self.request_ephemeral_storage = request_ephemeral_storage
- self.limit_memory = limit_memory
- self.limit_cpu = limit_cpu
- self.limit_gpu = limit_gpu
- self.limit_ephemeral_storage = limit_ephemeral_storage
-
- def to_k8s_client_obj(self):
- """
- Converts to k8s object.
-
- @rtype: object
- """
- limits_raw = {
- "cpu": self.limit_cpu,
- "memory": self.limit_memory,
- "nvidia.com/gpu": self.limit_gpu,
- "ephemeral-storage": self.limit_ephemeral_storage,
- }
- requests_raw = {
- "cpu": self.request_cpu,
- "memory": self.request_memory,
- "ephemeral-storage": self.request_ephemeral_storage,
- }
-
- limits = {k: v for k, v in limits_raw.items() if v}
- requests = {k: v for k, v in requests_raw.items() if v}
- resource_req = k8s.V1ResourceRequirements(limits=limits,
requests=requests)
- return resource_req
-
-
-class Port:
- """POD port"""
-
- __slots__ = ("name", "container_port")
-
- def __init__(self, name=None, container_port=None):
- """Creates port"""
- self.name = name
- self.container_port = container_port
-
- def to_k8s_client_obj(self):
- """Converts to k8s object."""
- return k8s.V1ContainerPort(name=self.name,
container_port=self.container_port)
diff --git
a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
deleted file mode 100644
index 7fbd455b8e..0000000000
--- a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use :mod:`kubernetes.client.models.V1EnvVar`.
-"""
-from __future__ import annotations
-
-import warnings
-
-import kubernetes.client.models as k8s
-
-warnings.warn(
- "This module is deprecated. Please use
`kubernetes.client.models.V1EnvVar`.",
- DeprecationWarning,
- stacklevel=2,
-)
-
-
-class PodRuntimeInfoEnv:
- """Defines Pod runtime information as environment variable."""
-
- def __init__(self, name, field_path):
- """
- Adds Kubernetes pod runtime information as environment variables such
as namespace, pod IP, pod name.
- Full list of options can be found in kubernetes documentation.
-
- :param name: the name of the environment variable
- :param field_path: path to pod runtime info. Ex: metadata.namespace |
status.podIP
- """
- self.name = name
- self.field_path = field_path
-
- def to_k8s_client_obj(self):
- """Converts to k8s object.
-
- :return: kubernetes.client.models.V1EnvVar
- """
- return k8s.V1EnvVar(
- name=self.name,
-
value_from=k8s.V1EnvVarSource(field_ref=k8s.V1ObjectFieldSelector(field_path=self.field_path)),
- )
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume.py
b/airflow/providers/cncf/kubernetes/backcompat/volume.py
deleted file mode 100644
index 50180ad05d..0000000000
--- a/airflow/providers/cncf/kubernetes/backcompat/volume.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module is deprecated. Please use
:mod:`kubernetes.client.models.V1Volume`."""
-from __future__ import annotations
-
-import warnings
-
-from kubernetes.client import models as k8s
-
-warnings.warn(
- "This module is deprecated. Please use
`kubernetes.client.models.V1Volume`.",
- DeprecationWarning,
- stacklevel=2,
-)
-
-
-class Volume:
- """Backward compatible Volume"""
-
- def __init__(self, name, configs):
- """Adds Kubernetes Volume to pod. allows pod to access features like
ConfigMaps
- and Persistent Volumes
-
- :param name: the name of the volume mount
- :param configs: dictionary of any features needed for volume. We
purposely keep this
- vague since there are multiple volume types with changing configs.
- """
- self.name = name
- self.configs = configs
-
- def to_k8s_client_obj(self) -> k8s.V1Volume:
- """
- Converts to k8s object.
-
- :return: Volume Mount k8s object
- """
- resp = k8s.V1Volume(name=self.name)
- for k, v in self.configs.items():
- snake_key = Volume._convert_to_snake_case(k)
- if hasattr(resp, snake_key):
- setattr(resp, snake_key, v)
- else:
- raise AttributeError(f"V1Volume does not have attribute {k}")
- return resp
-
- # source:
https://www.geeksforgeeks.org/python-program-to-convert-camel-case-string-to-snake-case/
- @staticmethod
- def _convert_to_snake_case(input_string):
- return "".join("_" + i.lower() if i.isupper() else i for i in
input_string).lstrip("_")
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
deleted file mode 100644
index b926a03355..0000000000
--- a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
+++ /dev/null
@@ -1,59 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Classes for interacting with Kubernetes API"""
-from __future__ import annotations
-
-import warnings
-
-from kubernetes.client import models as k8s
-
-warnings.warn(
- "This module is deprecated. Please use
`kubernetes.client.models.V1VolumeMount`.",
- DeprecationWarning,
- stacklevel=2,
-)
-
-
-class VolumeMount:
- """Backward compatible VolumeMount"""
-
- __slots__ = ("name", "mount_path", "sub_path", "read_only")
-
- def __init__(self, name, mount_path, sub_path, read_only):
- """
- Initialize a Kubernetes Volume Mount. Used to mount pod level volumes
to
- running container.
-
- :param name: the name of the volume mount
- :param mount_path:
- :param sub_path: subpath within the volume mount
- :param read_only: whether to access pod with read-only mode
- """
- self.name = name
- self.mount_path = mount_path
- self.sub_path = sub_path
- self.read_only = read_only
-
- def to_k8s_client_obj(self) -> k8s.V1VolumeMount:
- """
- Converts to k8s object.
-
- :return: Volume Mount k8s object
- """
- return k8s.V1VolumeMount(
- name=self.name, mount_path=self.mount_path,
sub_path=self.sub_path, read_only=self.read_only
- )
diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
index 0a6d9656d8..6bf79a069c 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
@@ -19,9 +19,8 @@ from __future__ import annotations
import json
import sys
import unittest
-from copy import copy
from unittest import mock
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock
import kubernetes.client.models as k8s
import pendulum
@@ -30,15 +29,10 @@ from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
from airflow.exceptions import AirflowException
-from airflow.kubernetes.pod import Port
-from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.kubernetes.secret import Secret
-from airflow.kubernetes.volume import Volume
-from airflow.kubernetes.volume_mount import VolumeMount
from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import
KubernetesPodOperator
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
from airflow.utils.types import DagRunType
@@ -225,66 +219,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.expected_pod["spec"]["affinity"] = affinity
assert self.expected_pod == actual_pod
- def test_port(self):
- port = Port("http", 80)
-
- k = KubernetesPodOperator(
- namespace="default",
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo 10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- in_cluster=False,
- do_xcom_push=False,
- ports=[port],
- )
- context = create_context(k)
- k.execute(context=context)
- actual_pod = self.api_client.sanitize_for_serialization(k.pod)
- self.expected_pod["spec"]["containers"][0]["ports"] = [{"name":
"http", "containerPort": 80}]
- assert self.expected_pod == actual_pod
-
- def test_volume_mount(self):
- with patch.object(PodManager, "log") as mock_logger:
- volume_mount = VolumeMount(
- "test-volume", mount_path="/tmp/test_volume", sub_path=None,
read_only=False
- )
-
- volume_config = {"persistentVolumeClaim": {"claimName":
"test-volume"}}
- volume = Volume(name="test-volume", configs=volume_config)
- args = [
- 'echo "retrieved from mount" > /tmp/test_volume/test.txt &&
cat /tmp/test_volume/test.txt'
- ]
- k = KubernetesPodOperator(
- namespace="default",
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=args,
- labels={"foo": "bar"},
- volume_mounts=[volume_mount],
- volumes=[volume],
- is_delete_operator_pod=False,
- name="test",
- task_id="task",
- in_cluster=False,
- do_xcom_push=False,
- )
- context = create_context(k)
- k.execute(context=context)
- mock_logger.info.assert_any_call("retrieved from mount")
- actual_pod = self.api_client.sanitize_for_serialization(k.pod)
- expected_pod = copy(self.expected_pod)
- expected_pod["spec"]["containers"][0]["args"] = args
- expected_pod["spec"]["containers"][0]["volumeMounts"] = [
- {"name": "test-volume", "mountPath": "/tmp/test_volume",
"readOnly": False}
- ]
- expected_pod["spec"]["volumes"] = [
- {"name": "test-volume", "persistentVolumeClaim": {"claimName":
"test-volume"}}
- ]
- assert expected_pod == actual_pod
-
def test_run_as_user_root(self):
security_context = {
"securityContext": {
@@ -503,7 +437,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
"ENV1": "val1",
"ENV2": "val2",
},
- pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
labels={"foo": "bar"},
name="test",
task_id="task",
@@ -519,7 +452,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.expected_pod["spec"]["containers"][0]["env"] = [
{"name": "ENV1", "value": "val1"},
{"name": "ENV2", "value": "val2"},
- {"name": "ENV3", "valueFrom": {"fieldRef": {"fieldPath":
"status.podIP"}}},
]
assert self.expected_pod == actual_pod
@@ -571,9 +503,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
args=["echo 10"],
)
- volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
- volume = Volume(name="test-volume", configs=volume_config)
-
expected_init_container = {
"name": "init-container",
"image": "ubuntu:16.04",
@@ -591,16 +520,12 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
labels={"foo": "bar"},
name="test",
task_id="task",
- volumes=[volume],
init_containers=[init_container],
in_cluster=False,
do_xcom_push=False,
)
context = create_context(k)
- k.execute(context)
- actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ pod = k.build_pod_request_obj(context)
+ actual_pod = self.api_client.sanitize_for_serialization(pod)
self.expected_pod["spec"]["initContainers"] = [expected_init_container]
- self.expected_pod["spec"]["volumes"] = [
- {"name": "test-volume", "persistentVolumeClaim": {"claimName":
"test-volume"}}
- ]
- assert self.expected_pod == actual_pod
+ assert actual_pod == self.expected_pod