This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6d85a0466d9 Remove deprecations cncf.kubernetes (#43689)
6d85a0466d9 is described below
commit 6d85a0466d91d501af87c8904b902ea92cee466d
Author: rom sharon <[email protected]>
AuthorDate: Sat Nov 9 22:43:40 2024 +0200
Remove deprecations cncf.kubernetes (#43689)
* change default namespace value to be None
* revert ui change
* remove deprecations from cncf.kubernetes
* add changelog
* add list of changes in changelog
* Update providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
* fix tests
* fix tests
* update changelog, remove deprecation warnings from tests
* add todo comment for removing import
* remove kubernetes_pod as deprecated module
* remove unecessary test, add import for deprecation
* fix tests
* fix tests and imports
* fix tests
* add ignore re-def in correct place
---------
Co-authored-by: Elad Kalif <[email protected]>
---
airflow/cli/commands/kubernetes_command.py | 4 +-
.../operators.rst | 2 +-
kubernetes_tests/test_kubernetes_pod_operator.py | 18 --
.../airflow/providers/amazon/aws/operators/eks.py | 6 +-
.../providers/cncf/kubernetes/CHANGELOG.rst | 28 ++
.../cncf/kubernetes/kubernetes_helper_functions.py | 41 ---
.../cncf/kubernetes/operators/kubernetes_pod.py | 31 --
.../providers/cncf/kubernetes/operators/pod.py | 32 +--
.../cncf/kubernetes/operators/spark_kubernetes.py | 2 -
.../providers/cncf/kubernetes/pod_generator.py | 118 --------
.../cncf/kubernetes/pod_launcher_deprecated.py | 320 ---------------------
.../cncf/kubernetes/triggers/kubernetes_pod.py | 31 --
.../providers/cncf/kubernetes/triggers/pod.py | 23 +-
.../providers/cncf/kubernetes/utils/pod_manager.py | 23 +-
.../executors/test_kubernetes_executor.py | 7 +-
.../tests/cncf/kubernetes/models/test_secret.py | 5 +-
.../tests/cncf/kubernetes/operators/test_pod.py | 35 ---
.../kubernetes/test_kubernetes_helper_functions.py | 21 +-
.../tests/cncf/kubernetes/test_pod_generator.py | 105 +------
.../tests/cncf/kubernetes/triggers/test_pod.py | 1 -
.../cncf/kubernetes/utils/test_pod_manager.py | 16 --
.../cloud/triggers/test_kubernetes_engine.py | 8 +-
.../in_container/run_provider_yaml_files_check.py | 2 -
tests/always/test_project_structure.py | 4 -
24 files changed, 61 insertions(+), 822 deletions(-)
diff --git a/airflow/cli/commands/kubernetes_command.py
b/airflow/cli/commands/kubernetes_command.py
index 2a6fccf14d1..eab11133c9d 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -31,7 +31,7 @@ from airflow.models import DagRun, TaskInstance
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import
KubeConfig
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
-from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
create_pod_id
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
create_unique_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.cli import get_dag
@@ -59,7 +59,7 @@ def generate_pod_yaml(args):
pod = PodGenerator.construct_pod(
dag_id=args.dag_id,
task_id=ti.task_id,
- pod_id=create_pod_id(args.dag_id, ti.task_id),
+ pod_id=create_unique_id(args.dag_id, ti.task_id),
try_number=ti.try_number,
kube_image=kube_config.kube_image,
date=ti.execution_date,
diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
index 2268a8655c9..dfc54e092fe 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
@@ -206,7 +206,7 @@ Read more on termination-log `here
<https://kubernetes.io/docs/tasks/debug/debug
KubernetesPodOperator callbacks
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-The
:class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator`
supports different
+The
:class:`~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator`
supports different
callbacks that can be used to trigger actions during the lifecycle of the pod.
In order to use them, you need to
create a subclass of
:class:`~airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback`
and override
the callbacks methods you want to use. Then you can pass your callback class
to the operator using the ``callbacks``
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 13bf99d728d..99e30c710d2 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -1201,24 +1201,6 @@ class TestKubernetesPodOperatorSystem:
self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
assert self.expected_pod["spec"] == actual_pod["spec"]
- def test_progess_call(self, mock_get_connection):
- progress_callback = MagicMock()
- k = KubernetesPodOperator(
- namespace="default",
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo 10"],
- labels=self.labels,
- task_id=str(uuid4()),
- in_cluster=False,
- do_xcom_push=False,
- get_logs=True,
- progress_callback=progress_callback,
- )
- context = create_context(k)
- k.execute(context)
- progress_callback.assert_called()
-
def test_changing_base_container_name_no_logs(self, mock_get_connection):
"""
This test checks BOTH a modified base container name AND the
get_logs=False flow,
diff --git a/providers/src/airflow/providers/amazon/aws/operators/eks.py
b/providers/src/airflow/providers/amazon/aws/operators/eks.py
index fa82cdcc72d..d90103362d0 100644
--- a/providers/src/airflow/providers/amazon/aws/operators/eks.py
+++ b/providers/src/airflow/providers/amazon/aws/operators/eks.py
@@ -45,8 +45,10 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import OnFinishAction
try:
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
except ImportError:
- # preserve backward compatibility for older versions of cncf.kubernetes
provider
- from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import
KubernetesPodOperator
+ # preserve backward compatibility for older versions of cncf.kubernetes
provider, remove this when minimum cncf.kubernetes provider is 10.0
+ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
# type: ignore[no-redef]
+ KubernetesPodOperator,
+ )
if TYPE_CHECKING:
from airflow.utils.context import Context
diff --git a/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
b/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 28417d6ac3d..abeebdc7d2e 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -31,6 +31,34 @@ Changelog
main
.....
+All deprecated classes, parameters and features have been removed from the
Kubernetes provider package.
+The following breaking changes were introduced:
+
+* Helpers
+ * Remove ``add_pod_suffix`` method from ``kubernetes_helper_functions.py``.
Use ``add_unique_suffix`` instead.
+ * Remove ``make_unique_pod_id`` method from ``PodGenerator``. Use
``add_unique_suffix`` in ``kubernetes_helper_functions`` instead.
+ * Remove ``create_pod_id`` method from ``kubernetes_helper_functions.py``.
Use ``create_unique_id`` instead.
+ * Remove ``gen_pod`` method from ``PodGenerator``.
+ * Remove ``add_xcom_sidecar`` method from ``PodGenerator``. Use
``airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar``
instead.
+ * Remove the option to using a dictionary for the executor_config
``from_obj`` function in ``PodGenerator``. Use a
``kubernetes.client.models.V1Pod`` class with a "pod_override" key.
+ * Remove ``from_legacy_obj`` method from ``PodGenerator``.
+ * Remove ``airflow.providers.cncf.kubernetes.pod_launcher_deprecated``
module. Use ``airflow.providers.cncf.kubernetes.utils.pod_manager`` instead.
+
+* Operators
+ * Remove ``airflow.providers.cncf.kubernetes.operators.kubernetes_pod``.
Use ``airflow.providers.cncf.kubernetes.operators.pod`` instead.
+ * Remove ``is_delete_operator_pod`` parameters from
``KubernetesPodOperator``. Use ``on_finish_action`` instead.
+ * Remove ``progress_callback`` parameters from ``KubernetesPodOperator``.
Use ``callbacks`` instead.
+ * Remove ``execute_complete`` method from ``KubernetesPodOperator``. Use
``trigger_reentry`` instead.
+ * Remove ``xcom_push`` parameter from ``SparkKubernetesOperator``. Use
``do_xcom_push``.
+
+* Triggers
+ * Remove ``should_delete_pod`` parameter from ``KubernetesPodTrigger``. Use
``on_finish_action`` instead.
+
+* Utils
+ * Remove ``progress_callback`` parameter from ``PodManager``.
+ * Remove ``follow_container_logs`` method from ``PodManager``. Use
``fetch_container_logs`` instead.
+
+
.. warning::
Set the default value of ``namespace`` in ``@task.kubernetes`` to ``None``,
so it uses the cluster namespace when ``in_cluster`` is True. Be sure to
specify a namespace when using this decorator. To retain the previous behavior,
set ``namespace="default"``
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
b/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
index c603f8a178b..813edcc6a6d 100644
---
a/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
+++
b/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
@@ -23,12 +23,10 @@ from functools import cache
from typing import TYPE_CHECKING
import pendulum
-from deprecated import deprecated
from kubernetes.client.rest import ApiException
from slugify import slugify
from airflow.configuration import conf
-from airflow.exceptions import AirflowProviderDeprecationWarning
if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
@@ -62,22 +60,6 @@ def add_unique_suffix(*, name: str, rand_len: int = 8,
max_len: int = POD_NAME_M
return name[: max_len - len(suffix)].strip("-.") + suffix
-@deprecated(
- reason="This function is deprecated. Please use `add_unique_suffix`",
- category=AirflowProviderDeprecationWarning,
-)
-def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int =
POD_NAME_MAX_LENGTH) -> str:
- """
- Add random string to pod name while staying under max length.
-
- :param pod_name: name of the pod
- :param rand_len: length of the random string to append
- :param max_len: maximum length of the pod name
- :meta private:
- """
- return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len)
-
-
def create_unique_id(
dag_id: str | None = None,
task_id: str | None = None,
@@ -110,29 +92,6 @@ def create_unique_id(
return base_name
-@deprecated(
- reason="This function is deprecated. Please use `create_unique_id`.",
- category=AirflowProviderDeprecationWarning,
-)
-def create_pod_id(
- dag_id: str | None = None,
- task_id: str | None = None,
- *,
- max_length: int = POD_NAME_MAX_LENGTH,
- unique: bool = True,
-) -> str:
- """
- Generate unique pod ID given a dag_id and / or task_id.
-
- :param dag_id: DAG ID
- :param task_id: Task ID
- :param max_length: max number of characters
- :param unique: whether a random string suffix should be added
- :return: A valid identifier for a kubernetes pod name
- """
- return create_unique_id(dag_id=dag_id, task_id=task_id,
max_length=max_length, unique=unique)
-
-
def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
"""Build a TaskInstanceKey based on pod annotations."""
log.debug("Creating task key for annotations %s", annotations)
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
deleted file mode 100644
index 3b3e6f7d952..00000000000
---
a/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module is deprecated. Please use
:mod:`airflow.providers.cncf.kubernetes.operators.pod` instead."""
-
-from __future__ import annotations
-
-import warnings
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.cncf.kubernetes.operators.pod import * # noqa: F403
-
-warnings.warn(
- "This module is deprecated. Please use
`airflow.providers.cncf.kubernetes.operators.pod` instead.",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
-)
diff --git a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 62f08439d41..2c99126230c 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -26,7 +26,6 @@ import os
import re
import shlex
import string
-import warnings
from collections.abc import Container, Mapping
from contextlib import AbstractContextManager
from enum import Enum
@@ -35,7 +34,6 @@ from typing import TYPE_CHECKING, Any, Callable, Iterable,
Literal, Sequence
import kubernetes
import tenacity
-from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.client.exceptions import ApiException
from kubernetes.stream import stream
@@ -44,7 +42,6 @@ from urllib3.exceptions import HTTPError
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
- AirflowProviderDeprecationWarning,
AirflowSkipException,
TaskDeferred,
)
@@ -215,18 +212,12 @@ class KubernetesPodOperator(BaseOperator):
:param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
If "delete_pod", the pod will be deleted regardless its state; if
"delete_succeeded_pod",
only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
- :param is_delete_operator_pod: What to do when the pod reaches its final
- state, or the execution is interrupted. If True (default), delete the
- pod; if False, leave the pod.
- Deprecated - use `on_finish_action` instead.
:param termination_message_policy: The termination message policy of the
base container.
Default value is "File"
:param active_deadline_seconds: The active_deadline_seconds which
translates to active_deadline_seconds
in V1PodSpec.
:param callbacks: KubernetesPodOperatorCallback instance contains the
callbacks methods on different step
of KubernetesPodOperator.
- :param progress_callback: Callback function for receiving k8s container
logs.
- `progress_callback` is deprecated, please use :param `callbacks`
instead.
:param logging_interval: max time in seconds that task should be in
deferred state before
resuming to fetch the latest logs. If ``None``, then the task will
remain in deferred state until pod
is done, and no logs will be visible until that time.
@@ -404,19 +395,8 @@ class KubernetesPodOperator(BaseOperator):
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
self.log_pod_spec_on_failure = log_pod_spec_on_failure
- if is_delete_operator_pod is not None:
- warnings.warn(
- "`is_delete_operator_pod` parameter is deprecated, please use
`on_finish_action`",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- self.on_finish_action = (
- OnFinishAction.DELETE_POD if is_delete_operator_pod else
OnFinishAction.KEEP_POD
- )
- self.is_delete_operator_pod = is_delete_operator_pod
- else:
- self.on_finish_action = OnFinishAction(on_finish_action)
- self.is_delete_operator_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
+ self.on_finish_action = OnFinishAction(on_finish_action)
+ self.is_delete_operator_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
self.termination_message_policy = termination_message_policy
self.active_deadline_seconds = active_deadline_seconds
self.logging_interval = logging_interval
@@ -512,9 +492,7 @@ class KubernetesPodOperator(BaseOperator):
@cached_property
def pod_manager(self) -> PodManager:
- return PodManager(
- kube_client=self.client, callbacks=self.callbacks,
progress_callback=self._progress_callback
- )
+ return PodManager(kube_client=self.client, callbacks=self.callbacks)
@cached_property
def hook(self) -> PodOperatorHookProtocol:
@@ -1161,10 +1139,6 @@ class KubernetesPodOperator(BaseOperator):
pod = self.build_pod_request_obj()
print(yaml.dump(prune_dict(pod.to_dict(), mode="strict")))
- @deprecated(reason="use `trigger_reentry` instead.",
category=AirflowProviderDeprecationWarning)
- def execute_complete(self, context: Context, event: dict, **kwargs):
- return self.trigger_reentry(context=context, event=event)
-
def process_duplicate_label_pods(self, pod_list: list[k8s.V1Pod]) ->
k8s.V1Pod:
"""
Patch or delete the existing pod with duplicate labels.
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index acab68d85ff..ad529290d8e 100644
---
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -98,8 +98,6 @@ class SparkKubernetesOperator(KubernetesPodOperator):
random_name_suffix: bool = True,
**kwargs,
) -> None:
- if kwargs.get("xcom_push") is not None:
- raise AirflowException("'xcom_push' was deprecated, use
'do_xcom_push' instead")
super().__init__(name=name, **kwargs)
self.image = image
self.code_path = code_path
diff --git a/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py
b/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py
index cf11db539e3..78838d9b444 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py
@@ -34,23 +34,16 @@ from typing import TYPE_CHECKING
import re2
from dateutil import parser
-from deprecated import deprecated
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient
from airflow.exceptions import (
AirflowConfigException,
AirflowException,
- AirflowProviderDeprecationWarning,
)
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
POD_NAME_MAX_LENGTH,
add_unique_suffix,
- rand_str,
-)
-from airflow.providers.cncf.kubernetes.pod_generator_deprecated import (
- PodDefaults as PodDefaultsDeprecated,
- PodGenerator as PodGeneratorDeprecated,
)
from airflow.utils import yaml
from airflow.utils.hashlib_wrapper import md5
@@ -155,40 +148,6 @@ class PodGenerator:
# Attach sidecar
self.extract_xcom = extract_xcom
- @deprecated(
- reason="This method is deprecated and will be removed in the future
releases",
- category=AirflowProviderDeprecationWarning,
- )
- def gen_pod(self) -> k8s.V1Pod:
- """Generate pod."""
- result = self.ud_pod
-
- result.metadata.name = add_unique_suffix(name=result.metadata.name)
-
- if self.extract_xcom:
- result = self.add_xcom_sidecar(result)
-
- return result
-
- @staticmethod
- @deprecated(
- reason=(
- "This function is deprecated. "
- "Please use
airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead"
- ),
- category=AirflowProviderDeprecationWarning,
- )
- def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
- """Add sidecar."""
- pod_cp = copy.deepcopy(pod)
- pod_cp.spec.volumes = pod.spec.volumes or []
- pod_cp.spec.volumes.insert(0, PodDefaultsDeprecated.VOLUME)
- pod_cp.spec.containers[0].volume_mounts =
pod_cp.spec.containers[0].volume_mounts or []
- pod_cp.spec.containers[0].volume_mounts.insert(0,
PodDefaultsDeprecated.VOLUME_MOUNT)
- pod_cp.spec.containers.append(PodDefaultsDeprecated.SIDECAR_CONTAINER)
-
- return pod_cp
-
@staticmethod
def from_obj(obj) -> dict | k8s.V1Pod | None:
"""Convert to pod from obj."""
@@ -210,57 +169,11 @@ class PodGenerator:
if isinstance(k8s_object, k8s.V1Pod):
return k8s_object
- elif isinstance(k8s_legacy_object, dict):
- warnings.warn(
- "Using a dictionary for the executor_config is deprecated and
will soon be removed. "
- 'Please use a `kubernetes.client.models.V1Pod` class with a
"pod_override" key'
- " instead. ",
- category=AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- return PodGenerator.from_legacy_obj(obj)
else:
raise TypeError(
"Cannot convert a non-kubernetes.client.models.V1Pod object
into a KubernetesExecutorConfig"
)
- @staticmethod
- def from_legacy_obj(obj) -> k8s.V1Pod | None:
- """Convert to pod from obj."""
- if obj is None:
- return None
-
- # We do not want to extract constant here from ExecutorLoader because
it is just
- # A name in dictionary rather than executor selection mechanism and it
causes cyclic import
- namespaced = obj.get("KubernetesExecutor", {})
-
- if not namespaced:
- return None
-
- resources = namespaced.get("resources")
-
- if resources is None:
- requests = {
- "cpu": namespaced.pop("request_cpu", None),
- "memory": namespaced.pop("request_memory", None),
- "ephemeral-storage": namespaced.get("ephemeral-storage"), #
We pop this one in limits
- }
- limits = {
- "cpu": namespaced.pop("limit_cpu", None),
- "memory": namespaced.pop("limit_memory", None),
- "ephemeral-storage": namespaced.pop("ephemeral-storage", None),
- }
- all_resources = list(requests.values()) + list(limits.values())
- if all(r is None for r in all_resources):
- resources = None
- else:
- # remove None's so they don't become 0's
- requests = {k: v for k, v in requests.items() if v is not None}
- limits = {k: v for k, v in limits.items() if v is not None}
- resources = k8s.V1ResourceRequirements(requests=requests,
limits=limits)
- namespaced["resources"] = resources
- return PodGeneratorDeprecated(**namespaced).gen_pod()
-
@staticmethod
def reconcile_pods(base_pod: k8s.V1Pod, client_pod: k8s.V1Pod | None) ->
k8s.V1Pod:
"""
@@ -579,37 +492,6 @@ class PodGenerator:
api_client = ApiClient()
return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod)
- @staticmethod
- @deprecated(
- reason="This method is deprecated. Use `add_pod_suffix` in
`kubernetes_helper_functions`.",
- category=AirflowProviderDeprecationWarning,
- )
- def make_unique_pod_id(pod_id: str) -> str | None:
- r"""
- Generate a unique Pod name.
-
- Kubernetes pod names must consist of one or more lowercase
- rfc1035/rfc1123 labels separated by '.' with a maximum length of 253
- characters.
-
- Name must pass the following regex for validation
- ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
-
- For more details, see:
-
https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
-
- :param pod_id: requested pod name
- :return: ``str`` valid Pod name of appropriate length
- """
- if not pod_id:
- return None
-
- max_pod_id_len = 100 # arbitrarily chosen
- suffix = rand_str(8) # 8 seems good enough
- base_pod_id_len = max_pod_id_len - len(suffix) - 1 # -1 for separator
- trimmed_pod_id = pod_id[:base_pod_id_len].rstrip("-.")
- return f"{trimmed_pod_id}-{suffix}"
-
def merge_objects(base_obj, client_obj):
"""
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
b/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
deleted file mode 100644
index e93be85e368..00000000000
--- a/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
+++ /dev/null
@@ -1,320 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Launches pods."""
-
-from __future__ import annotations
-
-import json
-import math
-import time
-import warnings
-from typing import TYPE_CHECKING, cast
-
-import pendulum
-import tenacity
-from kubernetes import client, watch
-from kubernetes.client.rest import ApiException
-from kubernetes.stream import stream as kubernetes_stream
-from requests.exceptions import HTTPError
-
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
-from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
-from airflow.providers.cncf.kubernetes.pod_generator import
PodDefaultsDeprecated
-from airflow.settings import pod_mutation_hook
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.state import State
-
-if TYPE_CHECKING:
- from kubernetes.client.models.v1_pod import V1Pod
-
-warnings.warn(
- """
- Please use :mod: Please use
`airflow.providers.cncf.kubernetes.utils.pod_manager`
-
- To use this module install the provider package by installing this pip
package:
-
- https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/
-
- """,
- RemovedInAirflow3Warning,
- stacklevel=2,
-)
-
-
-class PodStatus:
- """Status of the pods."""
-
- PENDING = "pending"
- RUNNING = "running"
- FAILED = "failed"
- SUCCEEDED = "succeeded"
-
-
-class PodLauncher(LoggingMixin):
- """
- Deprecated class for launching pods.
-
- Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager
instead.
- """
-
- def __init__(
- self,
- kube_client: client.CoreV1Api = None,
- in_cluster: bool = True,
- cluster_context: str | None = None,
- extract_xcom: bool = False,
- ):
- """
- Launch pods; DEPRECATED.
-
- Please use
airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager
- instead to create the launcher.
-
- :param kube_client: kubernetes client
- :param in_cluster: whether we are in cluster
- :param cluster_context: context of the cluster
- :param extract_xcom: whether we should extract xcom
- """
- super().__init__()
- self._client = kube_client or get_kube_client(in_cluster=in_cluster,
cluster_context=cluster_context)
- self._watch = watch.Watch()
- self.extract_xcom = extract_xcom
-
- def run_pod_async(self, pod: V1Pod, **kwargs):
- """Run pod asynchronously."""
- pod_mutation_hook(pod)
-
- sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
- json_pod = json.dumps(sanitized_pod, indent=2)
-
- self.log.debug("Pod Creation Request: \n%s", json_pod)
- try:
- resp = self._client.create_namespaced_pod(
- body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
- )
- self.log.debug("Pod Creation Response: %s", resp)
- except Exception as e:
- self.log.exception("Exception when attempting to create Namespaced
Pod: %s", json_pod)
- raise e
- return resp
-
- def delete_pod(self, pod: V1Pod):
- """Delete pod."""
- try:
- self._client.delete_namespaced_pod(
- pod.metadata.name, pod.metadata.namespace,
body=client.V1DeleteOptions()
- )
- except ApiException as e:
- # If the pod is already deleted
- if str(e.status) != "404":
- raise
-
- def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
- """
- Launch the pod synchronously and wait for completion.
-
- :param pod:
- :param startup_timeout: Timeout for startup of the pod (if pod is
pending for too long, fails task)
- :return:
- """
- resp = self.run_pod_async(pod)
- start_time = time.monotonic()
- if resp.status.start_time is None:
- while self.pod_not_started(pod):
- self.log.warning("Pod not yet started: %s", pod.metadata.name)
- if time.monotonic() >= start_time + startup_timeout:
- raise AirflowException("Pod took too long to start")
- time.sleep(1)
-
- def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str |
None]:
- """
- Monitor a pod and return the final state.
-
- :param pod: pod spec that will be monitored
- :param get_logs: whether to read the logs locally
- """
- if get_logs:
- read_logs_since_sec = None
- last_log_time: pendulum.DateTime | None = None
- while True:
- logs = self.read_pod_logs(pod, timestamps=True,
since_seconds=read_logs_since_sec)
- for line in logs:
- timestamp, message =
self.parse_log_line(line.decode("utf-8"))
- if timestamp:
- last_log_time = cast(pendulum.DateTime,
pendulum.parse(timestamp))
- self.log.info(message)
- time.sleep(1)
-
- if not self.base_container_is_running(pod):
- break
-
- self.log.warning("Pod %s log read interrupted",
pod.metadata.name)
- if last_log_time:
- delta = pendulum.now() - last_log_time
- # Prefer logs duplication rather than loss
- read_logs_since_sec = math.ceil(delta.total_seconds())
- result = None
- if self.extract_xcom:
- while self.base_container_is_running(pod):
- self.log.info("Container %s has state %s", pod.metadata.name,
State.RUNNING)
- time.sleep(2)
- result = self._extract_xcom(pod)
- self.log.info(result)
- result = json.loads(result)
- while self.pod_is_running(pod):
- self.log.info("Pod %s has state %s", pod.metadata.name,
State.RUNNING)
- time.sleep(2)
- return self._task_status(self.read_pod(pod)), result
-
- def parse_log_line(self, line: str) -> tuple[str | None, str]:
- """
- Parse K8s log line and returns the final state.
-
- :param line: k8s log line
- :return: timestamp and log message
- """
- timestamp, sep, message = line.strip().partition(" ")
- if not sep:
- self.log.error(
- "Error parsing timestamp (no timestamp in message: %r). "
- "Will continue execution but won't update timestamp",
- line,
- )
- return None, line
- return timestamp, message
-
- def _task_status(self, event):
- self.log.info("Event: %s had an event of type %s",
event.metadata.name, event.status.phase)
- status = self.process_status(event.metadata.name, event.status.phase)
- return status
-
- def pod_not_started(self, pod: V1Pod):
- """Test if pod has not started."""
- state = self._task_status(self.read_pod(pod))
- return state == State.QUEUED
-
- def pod_is_running(self, pod: V1Pod):
- """Test if pod is running."""
- state = self._task_status(self.read_pod(pod))
- return state not in (State.SUCCESS, State.FAILED)
-
- def base_container_is_running(self, pod: V1Pod):
- """Test if base container is running."""
- event = self.read_pod(pod)
- status = next((s for s in event.status.container_statuses if s.name ==
"base"), None)
- if not status:
- return False
- return status.state.running is not None
-
- @tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
- def read_pod_logs(
- self,
- pod: V1Pod,
- tail_lines: int | None = None,
- timestamps: bool = False,
- since_seconds: int | None = None,
- ):
- """Read log from the pod."""
- additional_kwargs = {}
- if since_seconds:
- additional_kwargs["since_seconds"] = since_seconds
-
- if tail_lines:
- additional_kwargs["tail_lines"] = tail_lines
-
- try:
- return self._client.read_namespaced_pod_log(
- name=pod.metadata.name,
- namespace=pod.metadata.namespace,
- container="base",
- follow=True,
- timestamps=timestamps,
- _preload_content=False,
- **additional_kwargs,
- )
- except HTTPError as e:
- raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
-
- @tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
- def read_pod_events(self, pod):
- """Read events from the pod."""
- try:
- return self._client.list_namespaced_event(
- namespace=pod.metadata.namespace,
field_selector=f"involvedObject.name={pod.metadata.name}"
- )
- except HTTPError as e:
- raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
-
- @tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
- def read_pod(self, pod: V1Pod):
- """Read pod information."""
- try:
- return self._client.read_namespaced_pod(pod.metadata.name,
pod.metadata.namespace)
- except HTTPError as e:
- raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
-
- def _extract_xcom(self, pod: V1Pod):
- resp = kubernetes_stream(
- self._client.connect_get_namespaced_pod_exec,
- pod.metadata.name,
- pod.metadata.namespace,
- container=PodDefaultsDeprecated.SIDECAR_CONTAINER_NAME,
- command=["/bin/sh"],
- stdin=True,
- stdout=True,
- stderr=True,
- tty=False,
- _preload_content=False,
- )
- try:
- result = self._exec_pod_command(resp, f"cat
{PodDefaultsDeprecated.XCOM_MOUNT_PATH}/return.json")
- self._exec_pod_command(resp, "kill -s SIGINT 1")
- finally:
- resp.close()
- if result is None:
- raise AirflowException(f"Failed to extract xcom from pod:
{pod.metadata.name}")
- return result
-
- def _exec_pod_command(self, resp, command):
- if resp.is_open():
- self.log.info("Running command... %s\n", command)
- resp.write_stdin(command + "\n")
- while resp.is_open():
- resp.update(timeout=1)
- if resp.peek_stdout():
- return resp.read_stdout()
- if resp.peek_stderr():
- self.log.info(resp.read_stderr())
- break
- return None
-
- def process_status(self, job_id, status):
- """Process status information for the job."""
- status = status.lower()
- if status == PodStatus.PENDING:
- return State.QUEUED
- elif status == PodStatus.FAILED:
- self.log.error("Event with job id %s Failed", job_id)
- return State.FAILED
- elif status == PodStatus.SUCCEEDED:
- self.log.info("Event with job id %s Succeeded", job_id)
- return State.SUCCESS
- elif status == PodStatus.RUNNING:
- return State.RUNNING
- else:
- self.log.error("Event: Invalid state %s on job %s", status, job_id)
- return State.FAILED
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py
b/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py
deleted file mode 100644
index 17ddc9aee8f..00000000000
--- a/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module is deprecated. Please use
:mod:`airflow.providers.cncf.kubernetes.triggers.pod` instead."""
-
-from __future__ import annotations
-
-import warnings
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.cncf.kubernetes.triggers.pod import * # noqa: F403
-
-warnings.warn(
- "This module is deprecated. Please use
`airflow.providers.cncf.kubernetes.triggers.pod` instead.",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
-)
diff --git a/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py
b/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 102dc438006..51b86647f36 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -19,12 +19,10 @@ from __future__ import annotations
import asyncio
import datetime
import traceback
-import warnings
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, AsyncIterator
-from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.hooks.kubernetes import
AsyncKubernetesHook
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
OnFinishAction,
@@ -71,10 +69,6 @@ class KubernetesPodTrigger(BaseTrigger):
:param on_finish_action: What to do when the pod reaches its final state,
or the execution is interrupted.
If "delete_pod", the pod will be deleted regardless its state; if
"delete_succeeded_pod",
only succeeded pod will be deleted. You can set to "keep_pod" to keep
the pod.
- :param should_delete_pod: What to do when the pod reaches its final
- state, or the execution is interrupted. If True (default), delete the
- pod; if False, leave the pod.
- Deprecated - use `on_finish_action` instead.
:param logging_interval: number of seconds to wait before kicking it back
to
the operator to print latest logs. If ``None`` will wait until
container done.
:param last_log_time: where to resume logs from
@@ -95,7 +89,6 @@ class KubernetesPodTrigger(BaseTrigger):
startup_timeout: int = 120,
startup_check_interval: int = 5,
on_finish_action: str = "delete_pod",
- should_delete_pod: bool | None = None,
last_log_time: DateTime | None = None,
logging_interval: int | None = None,
):
@@ -114,20 +107,7 @@ class KubernetesPodTrigger(BaseTrigger):
self.startup_check_interval = startup_check_interval
self.last_log_time = last_log_time
self.logging_interval = logging_interval
-
- if should_delete_pod is not None:
- warnings.warn(
- "`should_delete_pod` parameter is deprecated, please use
`on_finish_action`",
- category=AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- self.on_finish_action = (
- OnFinishAction.DELETE_POD if should_delete_pod else
OnFinishAction.KEEP_POD
- )
- self.should_delete_pod = should_delete_pod
- else:
- self.on_finish_action = OnFinishAction(on_finish_action)
- self.should_delete_pod = self.on_finish_action ==
OnFinishAction.DELETE_POD
+ self.on_finish_action = OnFinishAction(on_finish_action)
self._since_time = None
@@ -148,7 +128,6 @@ class KubernetesPodTrigger(BaseTrigger):
"startup_timeout": self.startup_timeout,
"startup_check_interval": self.startup_check_interval,
"trigger_start_time": self.trigger_start_time,
- "should_delete_pod": self.should_delete_pod,
"on_finish_action": self.on_finish_action.value,
"last_log_time": self.last_log_time,
"logging_interval": self.logging_interval,
diff --git
a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index cd91dc09281..aa8e812921c 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -26,11 +26,10 @@ from collections.abc import Iterable
from contextlib import closing, suppress
from dataclasses import dataclass
from datetime import timedelta
-from typing import TYPE_CHECKING, Callable, Generator, Protocol, cast
+from typing import TYPE_CHECKING, Generator, Protocol, cast
import pendulum
import tenacity
-from deprecated import deprecated
from kubernetes import client, watch
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
@@ -39,7 +38,7 @@ from pendulum.parsing.exceptions import ParserError
from typing_extensions import Literal
from urllib3.exceptions import HTTPError, TimeoutError
-from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode,
KubernetesPodOperatorCallback
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -302,19 +301,15 @@ class PodManager(LoggingMixin):
self,
kube_client: client.CoreV1Api,
callbacks: type[KubernetesPodOperatorCallback] | None = None,
- progress_callback: Callable[[str], None] | None = None,
):
"""
Create the launcher.
:param kube_client: kubernetes client
:param callbacks:
- :param progress_callback: Callback function invoked when fetching
container log.
- This parameter is deprecated, please use ````
"""
super().__init__()
self._client = kube_client
- self._progress_callback = progress_callback
self._watch = watch.Watch()
self._callbacks = callbacks
@@ -383,16 +378,6 @@ class PodManager(LoggingMixin):
raise PodLaunchFailedException(msg)
time.sleep(startup_check_interval)
- @deprecated(
- reason=(
- "Method `follow_container_logs` is deprecated. Use
`fetch_container_logs` instead "
- "with option `follow=True`."
- ),
- category=AirflowProviderDeprecationWarning,
- )
- def follow_container_logs(self, pod: V1Pod, container_name: str) ->
PodLoggingStatus:
- return self.fetch_container_logs(pod=pod,
container_name=container_name, follow=True)
-
def fetch_container_logs(
self,
pod: V1Pod,
@@ -461,8 +446,6 @@ class PodManager(LoggingMixin):
progress_callback_lines.append(line)
else: # previous log line is complete
for line in progress_callback_lines:
- if self._progress_callback:
- self._progress_callback(line)
if self._callbacks:
self._callbacks.progress_callback(
line=line, client=self._client,
mode=ExecutionMode.SYNC
@@ -479,8 +462,6 @@ class PodManager(LoggingMixin):
finally:
# log the last line and update the last_captured_timestamp
for line in progress_callback_lines:
- if self._progress_callback:
- self._progress_callback(line)
if self._callbacks:
self._callbacks.progress_callback(
line=line, client=self._client,
mode=ExecutionMode.SYNC
diff --git
a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
index ea143edd829..f5db8b806af 100644
--- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -28,7 +28,7 @@ from kubernetes.client import models as k8s
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse
-from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
@@ -52,12 +52,12 @@ from
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils impor
get_base_pod_from_template,
)
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
+ add_unique_suffix,
annotations_for_logging_task_metadata,
annotations_to_key,
create_unique_id,
get_logs_task_metadata,
)
-from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils import timezone
from airflow.utils.state import State, TaskInstanceState
@@ -106,8 +106,7 @@ class TestAirflowKubernetesScheduler:
def test_create_pod_id(self):
for dag_id, task_id in self._cases():
- with pytest.warns(AirflowProviderDeprecationWarning,
match=r"deprecated\. Use `add_pod_suffix`"):
- pod_name =
PodGenerator.make_unique_pod_id(create_unique_id(dag_id, task_id))
+ pod_name = add_unique_suffix(name=create_unique_id(dag_id,
task_id))
assert self._is_valid_pod_id(pod_name), f"dag_id={dag_id!r},
task_id={task_id!r}"
@mock.patch("airflow.providers.cncf.kubernetes.pod_generator.PodGenerator")
diff --git a/providers/tests/cncf/kubernetes/models/test_secret.py
b/providers/tests/cncf/kubernetes/models/test_secret.py
index 51fd17ca71f..1f299529687 100644
--- a/providers/tests/cncf/kubernetes/models/test_secret.py
+++ b/providers/tests/cncf/kubernetes/models/test_secret.py
@@ -62,12 +62,9 @@ class TestSecret:
)
@mock.patch("uuid.uuid4")
- @mock.patch("airflow.providers.cncf.kubernetes.pod_generator.rand_str")
- def test_attach_to_pod(self, mock_rand_str, mock_uuid, data_file):
+ def test_attach_to_pod(self, mock_uuid, data_file):
static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
mock_uuid.return_value = static_uuid
- rand_str = "abcd1234"
- mock_rand_str.return_value = rand_str
template_file = data_file("pods/generator_base.yaml").as_posix()
pod = PodGenerator(pod_template_file=template_file).ud_pod
secrets = [
diff --git a/providers/tests/cncf/kubernetes/operators/test_pod.py
b/providers/tests/cncf/kubernetes/operators/test_pod.py
index a39b1c6d889..d6b7378c80c 100644
--- a/providers/tests/cncf/kubernetes/operators/test_pod.py
+++ b/providers/tests/cncf/kubernetes/operators/test_pod.py
@@ -31,7 +31,6 @@ from urllib3 import HTTPResponse
from airflow.exceptions import (
AirflowException,
- AirflowProviderDeprecationWarning,
AirflowSkipException,
TaskDeferred,
)
@@ -1305,31 +1304,6 @@ class TestKubernetesPodOperator:
else:
mock_await.assert_not_called()
- @pytest.mark.parametrize(
- "on_finish_action",
- # Regardless what we provide in `on_finish_action`
- # it doesn't take any affect if `is_delete_operator_pod` provided.
- [*sorted(OnFinishAction.__members__.values()), None],
- )
- @pytest.mark.parametrize(
- "is_delete_operator_pod, expected_on_finish_action",
- [
- pytest.param(True, "delete_pod", id="delete-operator-pod"),
- pytest.param(False, "keep_pod", id="keep-operator-pod"),
- ],
- )
- def test_deprecated_is_delete_operator_pod(
- self, is_delete_operator_pod, expected_on_finish_action,
on_finish_action
- ):
- with pytest.warns(AirflowProviderDeprecationWarning, match="please use
`on_finish_action`"):
- op = KubernetesPodOperator(
- task_id="task",
- is_delete_operator_pod=is_delete_operator_pod,
- on_finish_action=on_finish_action,
- )
- assert op.is_delete_operator_pod == is_delete_operator_pod
- assert op.on_finish_action == expected_on_finish_action
-
@pytest.mark.parametrize(
"task_kwargs, should_fail, should_be_deleted",
[
@@ -2272,15 +2246,6 @@ class TestKubernetesPodOperatorAsync:
},
)
- def test_deprecated_execute_complete(self):
- fake_context = mock.sentinel.context
- fake_event = mock.sentinel.event
- with mock.patch.object(KubernetesPodOperator, "trigger_reentry") as
mocked_trigger_reentry:
- op = KubernetesPodOperator(task_id="test-task")
- with pytest.warns(AirflowProviderDeprecationWarning, match="use
`trigger_reentry` instead"):
- op.execute_complete(fake_context, fake_event)
- mocked_trigger_reentry.assert_called_once_with(context=fake_context,
event=fake_event)
-
@pytest.mark.parametrize("do_xcom_push", [True, False])
@patch(KUB_OP_PATH.format("extract_xcom"))
diff --git
a/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py
b/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py
index bfe4f98f3e1..5dce063d572 100644
--- a/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py
+++ b/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py
@@ -18,12 +18,10 @@
from __future__ import annotations
import re
-from unittest import mock
import pytest
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
create_pod_id, create_unique_id
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
create_unique_id
pod_name_regex =
r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
@@ -110,20 +108,3 @@ class TestCreateUniqueId:
assert re.match(r"-[a-z0-9]{8}", actual[-9:])
else:
assert actual == base[:length]
-
- @pytest.mark.parametrize("dag_id", ["fake-dag", None])
- @pytest.mark.parametrize("task_id", ["fake-task", None])
- @pytest.mark.parametrize("max_length", [10, 42, None])
- @pytest.mark.parametrize("unique", [True, False])
- def test_back_compat_create_pod_id(self, dag_id, task_id, max_length,
unique):
- with mock.patch(
-
"airflow.providers.cncf.kubernetes.kubernetes_helper_functions.create_unique_id"
- ) as mocked_create_unique_id:
- with pytest.warns(
- AirflowProviderDeprecationWarning, match=r"deprecated. Please
use `create_unique_id`"
- ):
- create_pod_id(dag_id, task_id, max_length=max_length,
unique=unique)
-
- mocked_create_unique_id.assert_called_once_with(
- dag_id=dag_id, task_id=task_id, max_length=max_length,
unique=unique
- )
diff --git a/providers/tests/cncf/kubernetes/test_pod_generator.py
b/providers/tests/cncf/kubernetes/test_pod_generator.py
index 1a18693852a..30d1df2a0d7 100644
--- a/providers/tests/cncf/kubernetes/test_pod_generator.py
+++ b/providers/tests/cncf/kubernetes/test_pod_generator.py
@@ -26,10 +26,10 @@ from dateutil import parser
from kubernetes.client import ApiClient, models as k8s
from airflow import __version__
-from airflow.exceptions import AirflowConfigException,
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowConfigException
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import
PodReconciliationError
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
add_unique_suffix
from airflow.providers.cncf.kubernetes.pod_generator import (
- PodDefaultsDeprecated,
PodGenerator,
datetime_to_label_safe_datestring,
extend_object_field,
@@ -160,41 +160,6 @@ class TestPodGenerator:
),
)
-
@mock.patch("airflow.providers.cncf.kubernetes.kubernetes_helper_functions.rand_str")
- def test_gen_pod_extract_xcom(self, mock_rand_str, data_file):
- """
- Method gen_pod is used nowhere in codebase and is deprecated.
- This test is only retained for backcompat.
- """
- mock_rand_str.return_value = self.rand_str
- template_file =
data_file("pods/generator_base_with_secrets.yaml").as_posix()
-
- pod_generator = PodGenerator(pod_template_file=template_file,
extract_xcom=True)
- with pytest.warns(AirflowProviderDeprecationWarning):
- result = pod_generator.gen_pod()
- container_two = {
- "name": "airflow-xcom-sidecar",
- "image": "alpine",
- "command": ["sh", "-c", PodDefaultsDeprecated.XCOM_CMD],
- "volumeMounts": [{"name": "xcom", "mountPath": "/airflow/xcom"}],
- "resources": {"requests": {"cpu": "1m"}},
- }
- self.expected.spec.containers.append(container_two)
- base_container: k8s.V1Container = self.expected.spec.containers[0]
- base_container.volume_mounts = base_container.volume_mounts or []
- base_container.volume_mounts.append(k8s.V1VolumeMount(name="xcom",
mount_path="/airflow/xcom"))
- self.expected.spec.containers[0] = base_container
- self.expected.spec.volumes = self.expected.spec.volumes or []
- self.expected.spec.volumes.append(
- k8s.V1Volume(
- name="xcom",
- empty_dir={},
- )
- )
- result_dict = self.k8s_client.sanitize_for_serialization(result)
- expected_dict =
self.k8s_client.sanitize_for_serialization(self.expected)
- assert result_dict == expected_dict
-
def test_from_obj_pod_override_object(self):
obj = {
"pod_override": k8s.V1Pod(
@@ -240,54 +205,6 @@ class TestPodGenerator:
},
}
- def test_from_obj_legacy(self):
- obj = {
- "KubernetesExecutor": {
- "annotations": {"test": "annotation"},
- "volumes": [
- {
- "name": "example-kubernetes-test-volume",
- "hostPath": {"path": "/tmp/"},
- },
- ],
- "volume_mounts": [
- {
- "mountPath": "/foo/",
- "name": "example-kubernetes-test-volume",
- },
- ],
- }
- }
- with pytest.warns(
- AirflowProviderDeprecationWarning,
- match="Using a dictionary for the executor_config is deprecated
and will soon be removed",
- ):
- result = PodGenerator.from_obj(obj)
-
- assert self.k8s_client.sanitize_for_serialization(result) == {
- "apiVersion": "v1",
- "kind": "Pod",
- "metadata": {
- "annotations": {"test": "annotation"},
- },
- "spec": {
- "containers": [
- {
- "args": [],
- "command": [],
- "env": [],
- "envFrom": [],
- "name": "base",
- "ports": [],
- "volumeMounts": [{"mountPath": "/foo/", "name":
"example-kubernetes-test-volume"}],
- }
- ],
- "hostNetwork": False,
- "imagePullSecrets": [],
- "volumes": [{"hostPath": {"path": "/tmp/"}, "name":
"example-kubernetes-test-volume"}],
- },
- }
-
def test_from_obj_both(self):
obj = {
"pod_override": k8s.V1Pod(
@@ -725,16 +642,13 @@ class TestPodGenerator:
),
)
def test_pod_name_confirm_to_max_length(self, input):
- with pytest.warns(
- AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in
`kubernetes_helper_functions`"
- ):
- actual = PodGenerator.make_unique_pod_id(input)
- assert len(actual) <= 100
+ actual = add_unique_suffix(name=input)
+ assert len(actual) <= 63
actual_base, actual_suffix = actual.rsplit("-", maxsplit=1)
# we limit pod id length to 100
# random suffix is 8 chars plus the '-' separator
- # so actual pod id base should first 91 chars of requested pod id
- assert actual_base == input[:91]
+ # so actual pod id base should first 55 chars of requested pod id
+ assert actual_base == input[:54]
# suffix should always be 8, the random alphanum
assert re.match(r"^[a-z0-9]{8}$", actual_suffix)
@@ -743,7 +657,7 @@ class TestPodGenerator:
(
(
"somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen-",
-
"somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen",
+ "somewhat-long-pod-name-maybe-longer-than-previously-su",
),
("pod-name-with-hyphen-", "pod-name-with-hyphen"),
("pod-name-with-double-hyphen--", "pod-name-with-double-hyphen"),
@@ -759,10 +673,7 @@ class TestPodGenerator:
`make_unique_pod_id` doesn't actually guarantee that the regex passes
for any input.
But I guess this test verifies that an otherwise valid pod_id doesn't
get _screwed up_.
"""
- with pytest.warns(
- AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in
`kubernetes_helper_functions`"
- ):
- actual = PodGenerator.make_unique_pod_id(pod_id)
+ actual = add_unique_suffix(name=pod_id)
assert len(actual) <= 253
assert actual == actual.lower(), "not lowercase"
# verify using official k8s regex
diff --git a/providers/tests/cncf/kubernetes/triggers/test_pod.py
b/providers/tests/cncf/kubernetes/triggers/test_pod.py
index 0e7522480a8..9b3a21d023f 100644
--- a/providers/tests/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/tests/cncf/kubernetes/triggers/test_pod.py
@@ -108,7 +108,6 @@ class TestKubernetesPodTrigger:
"startup_check_interval": 5,
"trigger_start_time": TRIGGER_START_TIME,
"on_finish_action": ON_FINISH_ACTION,
- "should_delete_pod": ON_FINISH_ACTION == "delete_pod",
"last_log_time": None,
"logging_interval": None,
}
diff --git a/providers/tests/cncf/kubernetes/utils/test_pod_manager.py
b/providers/tests/cncf/kubernetes/utils/test_pod_manager.py
index b577ea969ea..d220befc219 100644
--- a/providers/tests/cncf/kubernetes/utils/test_pod_manager.py
+++ b/providers/tests/cncf/kubernetes/utils/test_pod_manager.py
@@ -49,12 +49,10 @@ if TYPE_CHECKING:
class TestPodManager:
def setup_method(self):
- self.mock_progress_callback = mock.Mock()
self.mock_kube_client = mock.Mock()
self.pod_manager = PodManager(
kube_client=self.mock_kube_client,
callbacks=MockKubernetesPodOperatorCallback,
- progress_callback=self.mock_progress_callback,
)
def test_read_pod_logs_successfully_returns_logs(self):
@@ -294,19 +292,6 @@ class TestPodManager:
assert status.last_log_time == cast("DateTime",
pendulum.parse(timestamp_string))
-
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
-
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
- def test_fetch_container_logs_invoke_deprecated_progress_callback(
- self, mock_read_pod_logs, mock_container_is_running
- ):
- message = "2020-10-08T14:16:17.793417674Z message"
- no_ts_message = "notimestamp"
- mock_read_pod_logs.return_value = [bytes(message, "utf-8"),
bytes(no_ts_message, "utf-8")]
- mock_container_is_running.return_value = False
-
- self.pod_manager.fetch_container_logs(mock.MagicMock(),
mock.MagicMock(), follow=True)
- self.mock_progress_callback.assert_has_calls([mock.call(message),
mock.call(no_ts_message)])
-
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
def test_fetch_container_logs_invoke_progress_callback(
@@ -352,7 +337,6 @@ class TestPodManager:
mock_container_is_running.side_effect = [True, True, False]
status = self.pod_manager.fetch_container_logs(mock.MagicMock(),
mock.MagicMock(), follow=True)
assert status.last_log_time == cast("DateTime",
pendulum.parse(last_timestamp_string))
- assert self.mock_progress_callback.call_count == expected_call_count
assert mock_callbacks.progress_callback.call_count ==
expected_call_count
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
diff --git a/providers/tests/google/cloud/triggers/test_kubernetes_engine.py
b/providers/tests/google/cloud/triggers/test_kubernetes_engine.py
index 844e2f96332..076ce183710 100644
--- a/providers/tests/google/cloud/triggers/test_kubernetes_engine.py
+++ b/providers/tests/google/cloud/triggers/test_kubernetes_engine.py
@@ -27,7 +27,13 @@ import pytest
from google.cloud.container_v1.types import Operation
from kubernetes.client import models as k8s
-from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import
ContainerState
+try:
+ from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState
+except ImportError:
+ # preserve backward compatibility for older versions of cncf.kubernetes
provider, remove this when minimum cncf.kubernetes provider is 10.0
+ from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import ( #
type: ignore[no-redef]
+ ContainerState,
+ )
from airflow.providers.google.cloud.triggers.kubernetes_engine import (
GKEJobTrigger,
GKEOperationTrigger,
diff --git a/scripts/in_container/run_provider_yaml_files_check.py
b/scripts/in_container/run_provider_yaml_files_check.py
index ab5ebcac697..7d5756b2e7e 100755
--- a/scripts/in_container/run_provider_yaml_files_check.py
+++ b/scripts/in_container/run_provider_yaml_files_check.py
@@ -48,8 +48,6 @@ from airflow.providers_manager import ProvidersManager
DEPRECATED_MODULES = [
"airflow.providers.apache.hdfs.sensors.hdfs",
"airflow.providers.apache.hdfs.hooks.hdfs",
- "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod",
- "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
"airflow.providers.tabular.hooks.tabular",
"airflow.providers.yandex.hooks.yandexcloud_dataproc",
"airflow.providers.yandex.operators.yandexcloud_dataproc",
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 683fa7ce797..78534accd18 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -555,10 +555,6 @@ class
TestElasticsearchProviderProjectStructure(ExampleCoverageTest):
class TestCncfProviderProjectStructure(ExampleCoverageTest):
PROVIDER = "cncf"
CLASS_DIRS = ProjectStructureTest.CLASS_DIRS
- DEPRECATED_CLASSES = {
- "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
- "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod",
- }
BASE_CLASSES =
{"airflow.providers.cncf.kubernetes.operators.resource.KubernetesResourceBaseOperator"}