This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d85a0466d9 Remove deprecations cncf.kubernetes (#43689)
6d85a0466d9 is described below

commit 6d85a0466d91d501af87c8904b902ea92cee466d
Author: rom sharon <[email protected]>
AuthorDate: Sat Nov 9 22:43:40 2024 +0200

    Remove deprecations cncf.kubernetes (#43689)
    
    * change default namespace value to be None
    
    * revert ui change
    
    * remove deprecations from cncf.kubernetes
    
    * add changelog
    
    * add list of changes in changelog
    
    * Update providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
    
    * fix tests
    
    * fix tests
    
    * update changelog, remove deprecation warnings from tests
    
    * add todo comment for removing import
    
    * remove kubernetes_pod as deprecated module
    
    * remove unecessary test, add import for deprecation
    
    * fix tests
    
    * fix tests and imports
    
    * fix tests
    
    * add ignore re-def in correct place
    
    ---------
    
    Co-authored-by: Elad Kalif <[email protected]>
---
 airflow/cli/commands/kubernetes_command.py         |   4 +-
 .../operators.rst                                  |   2 +-
 kubernetes_tests/test_kubernetes_pod_operator.py   |  18 --
 .../airflow/providers/amazon/aws/operators/eks.py  |   6 +-
 .../providers/cncf/kubernetes/CHANGELOG.rst        |  28 ++
 .../cncf/kubernetes/kubernetes_helper_functions.py |  41 ---
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  31 --
 .../providers/cncf/kubernetes/operators/pod.py     |  32 +--
 .../cncf/kubernetes/operators/spark_kubernetes.py  |   2 -
 .../providers/cncf/kubernetes/pod_generator.py     | 118 --------
 .../cncf/kubernetes/pod_launcher_deprecated.py     | 320 ---------------------
 .../cncf/kubernetes/triggers/kubernetes_pod.py     |  31 --
 .../providers/cncf/kubernetes/triggers/pod.py      |  23 +-
 .../providers/cncf/kubernetes/utils/pod_manager.py |  23 +-
 .../executors/test_kubernetes_executor.py          |   7 +-
 .../tests/cncf/kubernetes/models/test_secret.py    |   5 +-
 .../tests/cncf/kubernetes/operators/test_pod.py    |  35 ---
 .../kubernetes/test_kubernetes_helper_functions.py |  21 +-
 .../tests/cncf/kubernetes/test_pod_generator.py    | 105 +------
 .../tests/cncf/kubernetes/triggers/test_pod.py     |   1 -
 .../cncf/kubernetes/utils/test_pod_manager.py      |  16 --
 .../cloud/triggers/test_kubernetes_engine.py       |   8 +-
 .../in_container/run_provider_yaml_files_check.py  |   2 -
 tests/always/test_project_structure.py             |   4 -
 24 files changed, 61 insertions(+), 822 deletions(-)

diff --git a/airflow/cli/commands/kubernetes_command.py 
b/airflow/cli/commands/kubernetes_command.py
index 2a6fccf14d1..eab11133c9d 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -31,7 +31,7 @@ from airflow.models import DagRun, TaskInstance
 from airflow.providers.cncf.kubernetes import pod_generator
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import 
KubeConfig
 from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
-from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
create_pod_id
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
create_unique_id
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
 from airflow.utils import cli as cli_utils, yaml
 from airflow.utils.cli import get_dag
@@ -59,7 +59,7 @@ def generate_pod_yaml(args):
         pod = PodGenerator.construct_pod(
             dag_id=args.dag_id,
             task_id=ti.task_id,
-            pod_id=create_pod_id(args.dag_id, ti.task_id),
+            pod_id=create_unique_id(args.dag_id, ti.task_id),
             try_number=ti.try_number,
             kube_image=kube_config.kube_image,
             date=ti.execution_date,
diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst 
b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
index 2268a8655c9..dfc54e092fe 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
@@ -206,7 +206,7 @@ Read more on termination-log `here 
<https://kubernetes.io/docs/tasks/debug/debug
 KubernetesPodOperator callbacks
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-The 
:class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator`
 supports different
+The 
:class:`~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator` 
supports different
 callbacks that can be used to trigger actions during the lifecycle of the pod. 
In order to use them, you need to
 create a subclass of 
:class:`~airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback`
 and override
 the callbacks methods you want to use. Then you can pass your callback class 
to the operator using the ``callbacks``
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 13bf99d728d..99e30c710d2 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -1201,24 +1201,6 @@ class TestKubernetesPodOperatorSystem:
         self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
         assert self.expected_pod["spec"] == actual_pod["spec"]
 
-    def test_progess_call(self, mock_get_connection):
-        progress_callback = MagicMock()
-        k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels=self.labels,
-            task_id=str(uuid4()),
-            in_cluster=False,
-            do_xcom_push=False,
-            get_logs=True,
-            progress_callback=progress_callback,
-        )
-        context = create_context(k)
-        k.execute(context)
-        progress_callback.assert_called()
-
     def test_changing_base_container_name_no_logs(self, mock_get_connection):
         """
         This test checks BOTH a modified base container name AND the 
get_logs=False flow,
diff --git a/providers/src/airflow/providers/amazon/aws/operators/eks.py 
b/providers/src/airflow/providers/amazon/aws/operators/eks.py
index fa82cdcc72d..d90103362d0 100644
--- a/providers/src/airflow/providers/amazon/aws/operators/eks.py
+++ b/providers/src/airflow/providers/amazon/aws/operators/eks.py
@@ -45,8 +45,10 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import OnFinishAction
 try:
     from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
 except ImportError:
-    # preserve backward compatibility for older versions of cncf.kubernetes 
provider
-    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
+    # preserve backward compatibility for older versions of cncf.kubernetes 
provider, remove this when minimum cncf.kubernetes provider is 10.0
+    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (  
# type: ignore[no-redef]
+        KubernetesPodOperator,
+    )
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
diff --git a/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst 
b/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 28417d6ac3d..abeebdc7d2e 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -31,6 +31,34 @@ Changelog
 main
 .....
 
+All deprecated classes, parameters and features have been removed from the 
Kubernetes provider package.
+The following breaking changes were introduced:
+
+* Helpers
+   * Remove ``add_pod_suffix`` method from ``kubernetes_helper_functions.py``. 
Use ``add_unique_suffix`` instead.
+   * Remove ``make_unique_pod_id`` method from ``PodGenerator``. Use 
``add_unique_suffix`` in ``kubernetes_helper_functions`` instead.
+   * Remove ``create_pod_id`` method from ``kubernetes_helper_functions.py``. 
Use ``create_unique_id`` instead.
+   * Remove ``gen_pod`` method from ``PodGenerator``.
+   * Remove ``add_xcom_sidecar`` method from ``PodGenerator``. Use 
``airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar`` 
instead.
+   * Remove the option to using a dictionary for the executor_config 
``from_obj`` function in ``PodGenerator``. Use a 
``kubernetes.client.models.V1Pod`` class with a "pod_override" key.
+   * Remove ``from_legacy_obj`` method from ``PodGenerator``.
+   * Remove ``airflow.providers.cncf.kubernetes.pod_launcher_deprecated`` 
module. Use ``airflow.providers.cncf.kubernetes.utils.pod_manager`` instead.
+
+* Operators
+   * Remove ``airflow.providers.cncf.kubernetes.operators.kubernetes_pod``. 
Use ``airflow.providers.cncf.kubernetes.operators.pod`` instead.
+   * Remove ``is_delete_operator_pod`` parameters from 
``KubernetesPodOperator``. Use ``on_finish_action`` instead.
+   * Remove ``progress_callback`` parameters from ``KubernetesPodOperator``. 
Use ``callbacks`` instead.
+   * Remove ``execute_complete`` method from ``KubernetesPodOperator``. Use 
``trigger_reentry`` instead.
+   * Remove ``xcom_push`` parameter from ``SparkKubernetesOperator``. Use 
``do_xcom_push``.
+
+* Triggers
+   * Remove ``should_delete_pod`` parameter from ``KubernetesPodTrigger``. Use 
``on_finish_action`` instead.
+
+* Utils
+   * Remove ``progress_callback`` parameter from ``PodManager``.
+   * Remove ``follow_container_logs`` method from ``PodManager``. Use 
``fetch_container_logs`` instead.
+
+
 .. warning::
   Set the default value of ``namespace`` in ``@task.kubernetes`` to ``None``, 
so it uses the cluster namespace when ``in_cluster`` is True. Be sure to 
specify a namespace when using this decorator. To retain the previous behavior, 
set ``namespace="default"``
 
diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
 
b/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
index c603f8a178b..813edcc6a6d 100644
--- 
a/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
+++ 
b/providers/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
@@ -23,12 +23,10 @@ from functools import cache
 from typing import TYPE_CHECKING
 
 import pendulum
-from deprecated import deprecated
 from kubernetes.client.rest import ApiException
 from slugify import slugify
 
 from airflow.configuration import conf
-from airflow.exceptions import AirflowProviderDeprecationWarning
 
 if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
@@ -62,22 +60,6 @@ def add_unique_suffix(*, name: str, rand_len: int = 8, 
max_len: int = POD_NAME_M
     return name[: max_len - len(suffix)].strip("-.") + suffix
 
 
-@deprecated(
-    reason="This function is deprecated. Please use `add_unique_suffix`",
-    category=AirflowProviderDeprecationWarning,
-)
-def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = 
POD_NAME_MAX_LENGTH) -> str:
-    """
-    Add random string to pod name while staying under max length.
-
-    :param pod_name: name of the pod
-    :param rand_len: length of the random string to append
-    :param max_len: maximum length of the pod name
-    :meta private:
-    """
-    return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len)
-
-
 def create_unique_id(
     dag_id: str | None = None,
     task_id: str | None = None,
@@ -110,29 +92,6 @@ def create_unique_id(
         return base_name
 
 
-@deprecated(
-    reason="This function is deprecated. Please use `create_unique_id`.",
-    category=AirflowProviderDeprecationWarning,
-)
-def create_pod_id(
-    dag_id: str | None = None,
-    task_id: str | None = None,
-    *,
-    max_length: int = POD_NAME_MAX_LENGTH,
-    unique: bool = True,
-) -> str:
-    """
-    Generate unique pod ID given a dag_id and / or task_id.
-
-    :param dag_id: DAG ID
-    :param task_id: Task ID
-    :param max_length: max number of characters
-    :param unique: whether a random string suffix should be added
-    :return: A valid identifier for a kubernetes pod name
-    """
-    return create_unique_id(dag_id=dag_id, task_id=task_id, 
max_length=max_length, unique=unique)
-
-
 def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
     """Build a TaskInstanceKey based on pod annotations."""
     log.debug("Creating task key for annotations %s", annotations)
diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
deleted file mode 100644
index 3b3e6f7d952..00000000000
--- 
a/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module is deprecated. Please use 
:mod:`airflow.providers.cncf.kubernetes.operators.pod` instead."""
-
-from __future__ import annotations
-
-import warnings
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.cncf.kubernetes.operators.pod import *  # noqa: F403
-
-warnings.warn(
-    "This module is deprecated. Please use 
`airflow.providers.cncf.kubernetes.operators.pod` instead.",
-    AirflowProviderDeprecationWarning,
-    stacklevel=2,
-)
diff --git a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py 
b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
index 62f08439d41..2c99126230c 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -26,7 +26,6 @@ import os
 import re
 import shlex
 import string
-import warnings
 from collections.abc import Container, Mapping
 from contextlib import AbstractContextManager
 from enum import Enum
@@ -35,7 +34,6 @@ from typing import TYPE_CHECKING, Any, Callable, Iterable, 
Literal, Sequence
 
 import kubernetes
 import tenacity
-from deprecated import deprecated
 from kubernetes.client import CoreV1Api, V1Pod, models as k8s
 from kubernetes.client.exceptions import ApiException
 from kubernetes.stream import stream
@@ -44,7 +42,6 @@ from urllib3.exceptions import HTTPError
 from airflow.configuration import conf
 from airflow.exceptions import (
     AirflowException,
-    AirflowProviderDeprecationWarning,
     AirflowSkipException,
     TaskDeferred,
 )
@@ -215,18 +212,12 @@ class KubernetesPodOperator(BaseOperator):
     :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
         If "delete_pod", the pod will be deleted regardless its state; if 
"delete_succeeded_pod",
         only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
-    :param is_delete_operator_pod: What to do when the pod reaches its final
-        state, or the execution is interrupted. If True (default), delete the
-        pod; if False, leave the pod.
-        Deprecated - use `on_finish_action` instead.
     :param termination_message_policy: The termination message policy of the 
base container.
         Default value is "File"
     :param active_deadline_seconds: The active_deadline_seconds which 
translates to active_deadline_seconds
         in V1PodSpec.
     :param callbacks: KubernetesPodOperatorCallback instance contains the 
callbacks methods on different step
         of KubernetesPodOperator.
-    :param progress_callback: Callback function for receiving k8s container 
logs.
-        `progress_callback` is deprecated, please use :param `callbacks` 
instead.
     :param logging_interval: max time in seconds that task should be in 
deferred state before
         resuming to fetch the latest logs. If ``None``, then the task will 
remain in deferred state until pod
         is done, and no logs will be visible until that time.
@@ -404,19 +395,8 @@ class KubernetesPodOperator(BaseOperator):
         self.poll_interval = poll_interval
         self.remote_pod: k8s.V1Pod | None = None
         self.log_pod_spec_on_failure = log_pod_spec_on_failure
-        if is_delete_operator_pod is not None:
-            warnings.warn(
-                "`is_delete_operator_pod` parameter is deprecated, please use 
`on_finish_action`",
-                AirflowProviderDeprecationWarning,
-                stacklevel=2,
-            )
-            self.on_finish_action = (
-                OnFinishAction.DELETE_POD if is_delete_operator_pod else 
OnFinishAction.KEEP_POD
-            )
-            self.is_delete_operator_pod = is_delete_operator_pod
-        else:
-            self.on_finish_action = OnFinishAction(on_finish_action)
-            self.is_delete_operator_pod = self.on_finish_action == 
OnFinishAction.DELETE_POD
+        self.on_finish_action = OnFinishAction(on_finish_action)
+        self.is_delete_operator_pod = self.on_finish_action == 
OnFinishAction.DELETE_POD
         self.termination_message_policy = termination_message_policy
         self.active_deadline_seconds = active_deadline_seconds
         self.logging_interval = logging_interval
@@ -512,9 +492,7 @@ class KubernetesPodOperator(BaseOperator):
 
     @cached_property
     def pod_manager(self) -> PodManager:
-        return PodManager(
-            kube_client=self.client, callbacks=self.callbacks, 
progress_callback=self._progress_callback
-        )
+        return PodManager(kube_client=self.client, callbacks=self.callbacks)
 
     @cached_property
     def hook(self) -> PodOperatorHookProtocol:
@@ -1161,10 +1139,6 @@ class KubernetesPodOperator(BaseOperator):
         pod = self.build_pod_request_obj()
         print(yaml.dump(prune_dict(pod.to_dict(), mode="strict")))
 
-    @deprecated(reason="use `trigger_reentry` instead.", 
category=AirflowProviderDeprecationWarning)
-    def execute_complete(self, context: Context, event: dict, **kwargs):
-        return self.trigger_reentry(context=context, event=event)
-
     def process_duplicate_label_pods(self, pod_list: list[k8s.V1Pod]) -> 
k8s.V1Pod:
         """
         Patch or delete the existing pod with duplicate labels.
diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py 
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index acab68d85ff..ad529290d8e 100644
--- 
a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ 
b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -98,8 +98,6 @@ class SparkKubernetesOperator(KubernetesPodOperator):
         random_name_suffix: bool = True,
         **kwargs,
     ) -> None:
-        if kwargs.get("xcom_push") is not None:
-            raise AirflowException("'xcom_push' was deprecated, use 
'do_xcom_push' instead")
         super().__init__(name=name, **kwargs)
         self.image = image
         self.code_path = code_path
diff --git a/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py 
b/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py
index cf11db539e3..78838d9b444 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py
@@ -34,23 +34,16 @@ from typing import TYPE_CHECKING
 
 import re2
 from dateutil import parser
-from deprecated import deprecated
 from kubernetes.client import models as k8s
 from kubernetes.client.api_client import ApiClient
 
 from airflow.exceptions import (
     AirflowConfigException,
     AirflowException,
-    AirflowProviderDeprecationWarning,
 )
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
     POD_NAME_MAX_LENGTH,
     add_unique_suffix,
-    rand_str,
-)
-from airflow.providers.cncf.kubernetes.pod_generator_deprecated import (
-    PodDefaults as PodDefaultsDeprecated,
-    PodGenerator as PodGeneratorDeprecated,
 )
 from airflow.utils import yaml
 from airflow.utils.hashlib_wrapper import md5
@@ -155,40 +148,6 @@ class PodGenerator:
         # Attach sidecar
         self.extract_xcom = extract_xcom
 
-    @deprecated(
-        reason="This method is deprecated and will be removed in the future 
releases",
-        category=AirflowProviderDeprecationWarning,
-    )
-    def gen_pod(self) -> k8s.V1Pod:
-        """Generate pod."""
-        result = self.ud_pod
-
-        result.metadata.name = add_unique_suffix(name=result.metadata.name)
-
-        if self.extract_xcom:
-            result = self.add_xcom_sidecar(result)
-
-        return result
-
-    @staticmethod
-    @deprecated(
-        reason=(
-            "This function is deprecated. "
-            "Please use 
airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead"
-        ),
-        category=AirflowProviderDeprecationWarning,
-    )
-    def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
-        """Add sidecar."""
-        pod_cp = copy.deepcopy(pod)
-        pod_cp.spec.volumes = pod.spec.volumes or []
-        pod_cp.spec.volumes.insert(0, PodDefaultsDeprecated.VOLUME)
-        pod_cp.spec.containers[0].volume_mounts = 
pod_cp.spec.containers[0].volume_mounts or []
-        pod_cp.spec.containers[0].volume_mounts.insert(0, 
PodDefaultsDeprecated.VOLUME_MOUNT)
-        pod_cp.spec.containers.append(PodDefaultsDeprecated.SIDECAR_CONTAINER)
-
-        return pod_cp
-
     @staticmethod
     def from_obj(obj) -> dict | k8s.V1Pod | None:
         """Convert to pod from obj."""
@@ -210,57 +169,11 @@ class PodGenerator:
 
         if isinstance(k8s_object, k8s.V1Pod):
             return k8s_object
-        elif isinstance(k8s_legacy_object, dict):
-            warnings.warn(
-                "Using a dictionary for the executor_config is deprecated and 
will soon be removed. "
-                'Please use a `kubernetes.client.models.V1Pod` class with a 
"pod_override" key'
-                " instead. ",
-                category=AirflowProviderDeprecationWarning,
-                stacklevel=2,
-            )
-            return PodGenerator.from_legacy_obj(obj)
         else:
             raise TypeError(
                 "Cannot convert a non-kubernetes.client.models.V1Pod object 
into a KubernetesExecutorConfig"
             )
 
-    @staticmethod
-    def from_legacy_obj(obj) -> k8s.V1Pod | None:
-        """Convert to pod from obj."""
-        if obj is None:
-            return None
-
-        # We do not want to extract constant here from ExecutorLoader because 
it is just
-        # A name in dictionary rather than executor selection mechanism and it 
causes cyclic import
-        namespaced = obj.get("KubernetesExecutor", {})
-
-        if not namespaced:
-            return None
-
-        resources = namespaced.get("resources")
-
-        if resources is None:
-            requests = {
-                "cpu": namespaced.pop("request_cpu", None),
-                "memory": namespaced.pop("request_memory", None),
-                "ephemeral-storage": namespaced.get("ephemeral-storage"),  # 
We pop this one in limits
-            }
-            limits = {
-                "cpu": namespaced.pop("limit_cpu", None),
-                "memory": namespaced.pop("limit_memory", None),
-                "ephemeral-storage": namespaced.pop("ephemeral-storage", None),
-            }
-            all_resources = list(requests.values()) + list(limits.values())
-            if all(r is None for r in all_resources):
-                resources = None
-            else:
-                # remove None's so they don't become 0's
-                requests = {k: v for k, v in requests.items() if v is not None}
-                limits = {k: v for k, v in limits.items() if v is not None}
-                resources = k8s.V1ResourceRequirements(requests=requests, 
limits=limits)
-        namespaced["resources"] = resources
-        return PodGeneratorDeprecated(**namespaced).gen_pod()
-
     @staticmethod
     def reconcile_pods(base_pod: k8s.V1Pod, client_pod: k8s.V1Pod | None) -> 
k8s.V1Pod:
         """
@@ -579,37 +492,6 @@ class PodGenerator:
         api_client = ApiClient()
         return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod)
 
-    @staticmethod
-    @deprecated(
-        reason="This method is deprecated. Use `add_pod_suffix` in 
`kubernetes_helper_functions`.",
-        category=AirflowProviderDeprecationWarning,
-    )
-    def make_unique_pod_id(pod_id: str) -> str | None:
-        r"""
-        Generate a unique Pod name.
-
-        Kubernetes pod names must consist of one or more lowercase
-        rfc1035/rfc1123 labels separated by '.' with a maximum length of 253
-        characters.
-
-        Name must pass the following regex for validation
-        ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
-
-        For more details, see:
-        
https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
-
-        :param pod_id: requested pod name
-        :return: ``str`` valid Pod name of appropriate length
-        """
-        if not pod_id:
-            return None
-
-        max_pod_id_len = 100  # arbitrarily chosen
-        suffix = rand_str(8)  # 8 seems good enough
-        base_pod_id_len = max_pod_id_len - len(suffix) - 1  # -1 for separator
-        trimmed_pod_id = pod_id[:base_pod_id_len].rstrip("-.")
-        return f"{trimmed_pod_id}-{suffix}"
-
 
 def merge_objects(base_obj, client_obj):
     """
diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py 
b/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
deleted file mode 100644
index e93be85e368..00000000000
--- a/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
+++ /dev/null
@@ -1,320 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Launches pods."""
-
-from __future__ import annotations
-
-import json
-import math
-import time
-import warnings
-from typing import TYPE_CHECKING, cast
-
-import pendulum
-import tenacity
-from kubernetes import client, watch
-from kubernetes.client.rest import ApiException
-from kubernetes.stream import stream as kubernetes_stream
-from requests.exceptions import HTTPError
-
-from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
-from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
-from airflow.providers.cncf.kubernetes.pod_generator import 
PodDefaultsDeprecated
-from airflow.settings import pod_mutation_hook
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.state import State
-
-if TYPE_CHECKING:
-    from kubernetes.client.models.v1_pod import V1Pod
-
-warnings.warn(
-    """
-    Please use :mod: Please use 
`airflow.providers.cncf.kubernetes.utils.pod_manager`
-
-    To use this module install the provider package by installing this pip 
package:
-
-    https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/
-
-    """,
-    RemovedInAirflow3Warning,
-    stacklevel=2,
-)
-
-
-class PodStatus:
-    """Status of the pods."""
-
-    PENDING = "pending"
-    RUNNING = "running"
-    FAILED = "failed"
-    SUCCEEDED = "succeeded"
-
-
-class PodLauncher(LoggingMixin):
-    """
-    Deprecated class for launching pods.
-
-    Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager 
instead.
-    """
-
-    def __init__(
-        self,
-        kube_client: client.CoreV1Api = None,
-        in_cluster: bool = True,
-        cluster_context: str | None = None,
-        extract_xcom: bool = False,
-    ):
-        """
-        Launch pods; DEPRECATED.
-
-        Please use 
airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager
-        instead to create the launcher.
-
-        :param kube_client: kubernetes client
-        :param in_cluster: whether we are in cluster
-        :param cluster_context: context of the cluster
-        :param extract_xcom: whether we should extract xcom
-        """
-        super().__init__()
-        self._client = kube_client or get_kube_client(in_cluster=in_cluster, 
cluster_context=cluster_context)
-        self._watch = watch.Watch()
-        self.extract_xcom = extract_xcom
-
-    def run_pod_async(self, pod: V1Pod, **kwargs):
-        """Run pod asynchronously."""
-        pod_mutation_hook(pod)
-
-        sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
-        json_pod = json.dumps(sanitized_pod, indent=2)
-
-        self.log.debug("Pod Creation Request: \n%s", json_pod)
-        try:
-            resp = self._client.create_namespaced_pod(
-                body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
-            )
-            self.log.debug("Pod Creation Response: %s", resp)
-        except Exception as e:
-            self.log.exception("Exception when attempting to create Namespaced 
Pod: %s", json_pod)
-            raise e
-        return resp
-
-    def delete_pod(self, pod: V1Pod):
-        """Delete pod."""
-        try:
-            self._client.delete_namespaced_pod(
-                pod.metadata.name, pod.metadata.namespace, 
body=client.V1DeleteOptions()
-            )
-        except ApiException as e:
-            # If the pod is already deleted
-            if str(e.status) != "404":
-                raise
-
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
-        """
-        Launch the pod synchronously and wait for completion.
-
-        :param pod:
-        :param startup_timeout: Timeout for startup of the pod (if pod is 
pending for too long, fails task)
-        :return:
-        """
-        resp = self.run_pod_async(pod)
-        start_time = time.monotonic()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                if time.monotonic() >= start_time + startup_timeout:
-                    raise AirflowException("Pod took too long to start")
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | 
None]:
-        """
-        Monitor a pod and return the final state.
-
-        :param pod: pod spec that will be monitored
-        :param get_logs: whether to read the logs locally
-        """
-        if get_logs:
-            read_logs_since_sec = None
-            last_log_time: pendulum.DateTime | None = None
-            while True:
-                logs = self.read_pod_logs(pod, timestamps=True, 
since_seconds=read_logs_since_sec)
-                for line in logs:
-                    timestamp, message = 
self.parse_log_line(line.decode("utf-8"))
-                    if timestamp:
-                        last_log_time = cast(pendulum.DateTime, 
pendulum.parse(timestamp))
-                    self.log.info(message)
-                time.sleep(1)
-
-                if not self.base_container_is_running(pod):
-                    break
-
-                self.log.warning("Pod %s log read interrupted", 
pod.metadata.name)
-                if last_log_time:
-                    delta = pendulum.now() - last_log_time
-                    # Prefer logs duplication rather than loss
-                    read_logs_since_sec = math.ceil(delta.total_seconds())
-        result = None
-        if self.extract_xcom:
-            while self.base_container_is_running(pod):
-                self.log.info("Container %s has state %s", pod.metadata.name, 
State.RUNNING)
-                time.sleep(2)
-            result = self._extract_xcom(pod)
-            self.log.info(result)
-            result = json.loads(result)
-        while self.pod_is_running(pod):
-            self.log.info("Pod %s has state %s", pod.metadata.name, 
State.RUNNING)
-            time.sleep(2)
-        return self._task_status(self.read_pod(pod)), result
-
-    def parse_log_line(self, line: str) -> tuple[str | None, str]:
-        """
-        Parse K8s log line and returns the final state.
-
-        :param line: k8s log line
-        :return: timestamp and log message
-        """
-        timestamp, sep, message = line.strip().partition(" ")
-        if not sep:
-            self.log.error(
-                "Error parsing timestamp (no timestamp in message: %r). "
-                "Will continue execution but won't update timestamp",
-                line,
-            )
-            return None, line
-        return timestamp, message
-
-    def _task_status(self, event):
-        self.log.info("Event: %s had an event of type %s", 
event.metadata.name, event.status.phase)
-        status = self.process_status(event.metadata.name, event.status.phase)
-        return status
-
-    def pod_not_started(self, pod: V1Pod):
-        """Test if pod has not started."""
-        state = self._task_status(self.read_pod(pod))
-        return state == State.QUEUED
-
-    def pod_is_running(self, pod: V1Pod):
-        """Test if pod is running."""
-        state = self._task_status(self.read_pod(pod))
-        return state not in (State.SUCCESS, State.FAILED)
-
-    def base_container_is_running(self, pod: V1Pod):
-        """Test if base container is running."""
-        event = self.read_pod(pod)
-        status = next((s for s in event.status.container_statuses if s.name == 
"base"), None)
-        if not status:
-            return False
-        return status.state.running is not None
-
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
-    def read_pod_logs(
-        self,
-        pod: V1Pod,
-        tail_lines: int | None = None,
-        timestamps: bool = False,
-        since_seconds: int | None = None,
-    ):
-        """Read log from the pod."""
-        additional_kwargs = {}
-        if since_seconds:
-            additional_kwargs["since_seconds"] = since_seconds
-
-        if tail_lines:
-            additional_kwargs["tail_lines"] = tail_lines
-
-        try:
-            return self._client.read_namespaced_pod_log(
-                name=pod.metadata.name,
-                namespace=pod.metadata.namespace,
-                container="base",
-                follow=True,
-                timestamps=timestamps,
-                _preload_content=False,
-                **additional_kwargs,
-            )
-        except HTTPError as e:
-            raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
-
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
-    def read_pod_events(self, pod):
-        """Read events from the pod."""
-        try:
-            return self._client.list_namespaced_event(
-                namespace=pod.metadata.namespace, 
field_selector=f"involvedObject.name={pod.metadata.name}"
-            )
-        except HTTPError as e:
-            raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
-
-    @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
-    def read_pod(self, pod: V1Pod):
-        """Read pod information."""
-        try:
-            return self._client.read_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace)
-        except HTTPError as e:
-            raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
-
-    def _extract_xcom(self, pod: V1Pod):
-        resp = kubernetes_stream(
-            self._client.connect_get_namespaced_pod_exec,
-            pod.metadata.name,
-            pod.metadata.namespace,
-            container=PodDefaultsDeprecated.SIDECAR_CONTAINER_NAME,
-            command=["/bin/sh"],
-            stdin=True,
-            stdout=True,
-            stderr=True,
-            tty=False,
-            _preload_content=False,
-        )
-        try:
-            result = self._exec_pod_command(resp, f"cat 
{PodDefaultsDeprecated.XCOM_MOUNT_PATH}/return.json")
-            self._exec_pod_command(resp, "kill -s SIGINT 1")
-        finally:
-            resp.close()
-        if result is None:
-            raise AirflowException(f"Failed to extract xcom from pod: 
{pod.metadata.name}")
-        return result
-
-    def _exec_pod_command(self, resp, command):
-        if resp.is_open():
-            self.log.info("Running command... %s\n", command)
-            resp.write_stdin(command + "\n")
-            while resp.is_open():
-                resp.update(timeout=1)
-                if resp.peek_stdout():
-                    return resp.read_stdout()
-                if resp.peek_stderr():
-                    self.log.info(resp.read_stderr())
-                    break
-        return None
-
-    def process_status(self, job_id, status):
-        """Process status information for the job."""
-        status = status.lower()
-        if status == PodStatus.PENDING:
-            return State.QUEUED
-        elif status == PodStatus.FAILED:
-            self.log.error("Event with job id %s Failed", job_id)
-            return State.FAILED
-        elif status == PodStatus.SUCCEEDED:
-            self.log.info("Event with job id %s Succeeded", job_id)
-            return State.SUCCESS
-        elif status == PodStatus.RUNNING:
-            return State.RUNNING
-        else:
-            self.log.error("Event: Invalid state %s on job %s", status, job_id)
-            return State.FAILED
diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py 
b/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py
deleted file mode 100644
index 17ddc9aee8f..00000000000
--- a/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module is deprecated. Please use 
:mod:`airflow.providers.cncf.kubernetes.triggers.pod` instead."""
-
-from __future__ import annotations
-
-import warnings
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.cncf.kubernetes.triggers.pod import *  # noqa: F403
-
-warnings.warn(
-    "This module is deprecated. Please use 
`airflow.providers.cncf.kubernetes.triggers.pod` instead.",
-    AirflowProviderDeprecationWarning,
-    stacklevel=2,
-)
diff --git a/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py
index 102dc438006..51b86647f36 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -19,12 +19,10 @@ from __future__ import annotations
 import asyncio
 import datetime
 import traceback
-import warnings
 from enum import Enum
 from functools import cached_property
 from typing import TYPE_CHECKING, Any, AsyncIterator
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import 
AsyncKubernetesHook
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
     OnFinishAction,
@@ -71,10 +69,6 @@ class KubernetesPodTrigger(BaseTrigger):
     :param on_finish_action: What to do when the pod reaches its final state, 
or the execution is interrupted.
         If "delete_pod", the pod will be deleted regardless its state; if 
"delete_succeeded_pod",
         only succeeded pod will be deleted. You can set to "keep_pod" to keep 
the pod.
-    :param should_delete_pod: What to do when the pod reaches its final
-        state, or the execution is interrupted. If True (default), delete the
-        pod; if False, leave the pod.
-        Deprecated - use `on_finish_action` instead.
     :param logging_interval: number of seconds to wait before kicking it back 
to
         the operator to print latest logs. If ``None`` will wait until 
container done.
     :param last_log_time: where to resume logs from
@@ -95,7 +89,6 @@ class KubernetesPodTrigger(BaseTrigger):
         startup_timeout: int = 120,
         startup_check_interval: int = 5,
         on_finish_action: str = "delete_pod",
-        should_delete_pod: bool | None = None,
         last_log_time: DateTime | None = None,
         logging_interval: int | None = None,
     ):
@@ -114,20 +107,7 @@ class KubernetesPodTrigger(BaseTrigger):
         self.startup_check_interval = startup_check_interval
         self.last_log_time = last_log_time
         self.logging_interval = logging_interval
-
-        if should_delete_pod is not None:
-            warnings.warn(
-                "`should_delete_pod` parameter is deprecated, please use 
`on_finish_action`",
-                category=AirflowProviderDeprecationWarning,
-                stacklevel=2,
-            )
-            self.on_finish_action = (
-                OnFinishAction.DELETE_POD if should_delete_pod else 
OnFinishAction.KEEP_POD
-            )
-            self.should_delete_pod = should_delete_pod
-        else:
-            self.on_finish_action = OnFinishAction(on_finish_action)
-            self.should_delete_pod = self.on_finish_action == 
OnFinishAction.DELETE_POD
+        self.on_finish_action = OnFinishAction(on_finish_action)
 
         self._since_time = None
 
@@ -148,7 +128,6 @@ class KubernetesPodTrigger(BaseTrigger):
                 "startup_timeout": self.startup_timeout,
                 "startup_check_interval": self.startup_check_interval,
                 "trigger_start_time": self.trigger_start_time,
-                "should_delete_pod": self.should_delete_pod,
                 "on_finish_action": self.on_finish_action.value,
                 "last_log_time": self.last_log_time,
                 "logging_interval": self.logging_interval,
diff --git 
a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index cd91dc09281..aa8e812921c 100644
--- a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -26,11 +26,10 @@ from collections.abc import Iterable
 from contextlib import closing, suppress
 from dataclasses import dataclass
 from datetime import timedelta
-from typing import TYPE_CHECKING, Callable, Generator, Protocol, cast
+from typing import TYPE_CHECKING, Generator, Protocol, cast
 
 import pendulum
 import tenacity
-from deprecated import deprecated
 from kubernetes import client, watch
 from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
@@ -39,7 +38,7 @@ from pendulum.parsing.exceptions import ParserError
 from typing_extensions import Literal
 from urllib3.exceptions import HTTPError, TimeoutError
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException
 from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, 
KubernetesPodOperatorCallback
 from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -302,19 +301,15 @@ class PodManager(LoggingMixin):
         self,
         kube_client: client.CoreV1Api,
         callbacks: type[KubernetesPodOperatorCallback] | None = None,
-        progress_callback: Callable[[str], None] | None = None,
     ):
         """
         Create the launcher.
 
         :param kube_client: kubernetes client
         :param callbacks:
-        :param progress_callback: Callback function invoked when fetching 
container log.
-            This parameter is deprecated, please use ````
         """
         super().__init__()
         self._client = kube_client
-        self._progress_callback = progress_callback
         self._watch = watch.Watch()
         self._callbacks = callbacks
 
@@ -383,16 +378,6 @@ class PodManager(LoggingMixin):
                 raise PodLaunchFailedException(msg)
             time.sleep(startup_check_interval)
 
-    @deprecated(
-        reason=(
-            "Method `follow_container_logs` is deprecated.  Use 
`fetch_container_logs` instead "
-            "with option `follow=True`."
-        ),
-        category=AirflowProviderDeprecationWarning,
-    )
-    def follow_container_logs(self, pod: V1Pod, container_name: str) -> 
PodLoggingStatus:
-        return self.fetch_container_logs(pod=pod, 
container_name=container_name, follow=True)
-
     def fetch_container_logs(
         self,
         pod: V1Pod,
@@ -461,8 +446,6 @@ class PodManager(LoggingMixin):
                                 progress_callback_lines.append(line)
                             else:  # previous log line is complete
                                 for line in progress_callback_lines:
-                                    if self._progress_callback:
-                                        self._progress_callback(line)
                                     if self._callbacks:
                                         self._callbacks.progress_callback(
                                             line=line, client=self._client, 
mode=ExecutionMode.SYNC
@@ -479,8 +462,6 @@ class PodManager(LoggingMixin):
                 finally:
                     # log the last line and update the last_captured_timestamp
                     for line in progress_callback_lines:
-                        if self._progress_callback:
-                            self._progress_callback(line)
                         if self._callbacks:
                             self._callbacks.progress_callback(
                                 line=line, client=self._client, 
mode=ExecutionMode.SYNC
diff --git 
a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
index ea143edd829..f5db8b806af 100644
--- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -28,7 +28,7 @@ from kubernetes.client import models as k8s
 from kubernetes.client.rest import ApiException
 from urllib3 import HTTPResponse
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException
 from airflow.executors.executor_constants import (
     CELERY_EXECUTOR,
     CELERY_KUBERNETES_EXECUTOR,
@@ -52,12 +52,12 @@ from 
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils impor
     get_base_pod_from_template,
 )
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
+    add_unique_suffix,
     annotations_for_logging_task_metadata,
     annotations_to_key,
     create_unique_id,
     get_logs_task_metadata,
 )
-from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
 from airflow.utils import timezone
 from airflow.utils.state import State, TaskInstanceState
 
@@ -106,8 +106,7 @@ class TestAirflowKubernetesScheduler:
 
     def test_create_pod_id(self):
         for dag_id, task_id in self._cases():
-            with pytest.warns(AirflowProviderDeprecationWarning, 
match=r"deprecated\. Use `add_pod_suffix`"):
-                pod_name = 
PodGenerator.make_unique_pod_id(create_unique_id(dag_id, task_id))
+            pod_name = add_unique_suffix(name=create_unique_id(dag_id, 
task_id))
             assert self._is_valid_pod_id(pod_name), f"dag_id={dag_id!r}, 
task_id={task_id!r}"
 
     @mock.patch("airflow.providers.cncf.kubernetes.pod_generator.PodGenerator")
diff --git a/providers/tests/cncf/kubernetes/models/test_secret.py 
b/providers/tests/cncf/kubernetes/models/test_secret.py
index 51fd17ca71f..1f299529687 100644
--- a/providers/tests/cncf/kubernetes/models/test_secret.py
+++ b/providers/tests/cncf/kubernetes/models/test_secret.py
@@ -62,12 +62,9 @@ class TestSecret:
         )
 
     @mock.patch("uuid.uuid4")
-    @mock.patch("airflow.providers.cncf.kubernetes.pod_generator.rand_str")
-    def test_attach_to_pod(self, mock_rand_str, mock_uuid, data_file):
+    def test_attach_to_pod(self, mock_uuid, data_file):
         static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
         mock_uuid.return_value = static_uuid
-        rand_str = "abcd1234"
-        mock_rand_str.return_value = rand_str
         template_file = data_file("pods/generator_base.yaml").as_posix()
         pod = PodGenerator(pod_template_file=template_file).ud_pod
         secrets = [
diff --git a/providers/tests/cncf/kubernetes/operators/test_pod.py 
b/providers/tests/cncf/kubernetes/operators/test_pod.py
index a39b1c6d889..d6b7378c80c 100644
--- a/providers/tests/cncf/kubernetes/operators/test_pod.py
+++ b/providers/tests/cncf/kubernetes/operators/test_pod.py
@@ -31,7 +31,6 @@ from urllib3 import HTTPResponse
 
 from airflow.exceptions import (
     AirflowException,
-    AirflowProviderDeprecationWarning,
     AirflowSkipException,
     TaskDeferred,
 )
@@ -1305,31 +1304,6 @@ class TestKubernetesPodOperator:
         else:
             mock_await.assert_not_called()
 
-    @pytest.mark.parametrize(
-        "on_finish_action",
-        # Regardless what we provide in `on_finish_action`
-        # it doesn't take any affect if `is_delete_operator_pod` provided.
-        [*sorted(OnFinishAction.__members__.values()), None],
-    )
-    @pytest.mark.parametrize(
-        "is_delete_operator_pod, expected_on_finish_action",
-        [
-            pytest.param(True, "delete_pod", id="delete-operator-pod"),
-            pytest.param(False, "keep_pod", id="keep-operator-pod"),
-        ],
-    )
-    def test_deprecated_is_delete_operator_pod(
-        self, is_delete_operator_pod, expected_on_finish_action, 
on_finish_action
-    ):
-        with pytest.warns(AirflowProviderDeprecationWarning, match="please use 
`on_finish_action`"):
-            op = KubernetesPodOperator(
-                task_id="task",
-                is_delete_operator_pod=is_delete_operator_pod,
-                on_finish_action=on_finish_action,
-            )
-        assert op.is_delete_operator_pod == is_delete_operator_pod
-        assert op.on_finish_action == expected_on_finish_action
-
     @pytest.mark.parametrize(
         "task_kwargs, should_fail, should_be_deleted",
         [
@@ -2272,15 +2246,6 @@ class TestKubernetesPodOperatorAsync:
                 },
             )
 
-    def test_deprecated_execute_complete(self):
-        fake_context = mock.sentinel.context
-        fake_event = mock.sentinel.event
-        with mock.patch.object(KubernetesPodOperator, "trigger_reentry") as 
mocked_trigger_reentry:
-            op = KubernetesPodOperator(task_id="test-task")
-            with pytest.warns(AirflowProviderDeprecationWarning, match="use 
`trigger_reentry` instead"):
-                op.execute_complete(fake_context, fake_event)
-        mocked_trigger_reentry.assert_called_once_with(context=fake_context, 
event=fake_event)
-
 
 @pytest.mark.parametrize("do_xcom_push", [True, False])
 @patch(KUB_OP_PATH.format("extract_xcom"))
diff --git 
a/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py 
b/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py
index bfe4f98f3e1..5dce063d572 100644
--- a/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py
+++ b/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py
@@ -18,12 +18,10 @@
 from __future__ import annotations
 
 import re
-from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
create_pod_id, create_unique_id
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
create_unique_id
 
 pod_name_regex = 
r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
 
@@ -110,20 +108,3 @@ class TestCreateUniqueId:
             assert re.match(r"-[a-z0-9]{8}", actual[-9:])
         else:
             assert actual == base[:length]
-
-    @pytest.mark.parametrize("dag_id", ["fake-dag", None])
-    @pytest.mark.parametrize("task_id", ["fake-task", None])
-    @pytest.mark.parametrize("max_length", [10, 42, None])
-    @pytest.mark.parametrize("unique", [True, False])
-    def test_back_compat_create_pod_id(self, dag_id, task_id, max_length, 
unique):
-        with mock.patch(
-            
"airflow.providers.cncf.kubernetes.kubernetes_helper_functions.create_unique_id"
-        ) as mocked_create_unique_id:
-            with pytest.warns(
-                AirflowProviderDeprecationWarning, match=r"deprecated. Please 
use `create_unique_id`"
-            ):
-                create_pod_id(dag_id, task_id, max_length=max_length, 
unique=unique)
-
-        mocked_create_unique_id.assert_called_once_with(
-            dag_id=dag_id, task_id=task_id, max_length=max_length, 
unique=unique
-        )
diff --git a/providers/tests/cncf/kubernetes/test_pod_generator.py 
b/providers/tests/cncf/kubernetes/test_pod_generator.py
index 1a18693852a..30d1df2a0d7 100644
--- a/providers/tests/cncf/kubernetes/test_pod_generator.py
+++ b/providers/tests/cncf/kubernetes/test_pod_generator.py
@@ -26,10 +26,10 @@ from dateutil import parser
 from kubernetes.client import ApiClient, models as k8s
 
 from airflow import __version__
-from airflow.exceptions import AirflowConfigException, 
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowConfigException
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import 
PodReconciliationError
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
add_unique_suffix
 from airflow.providers.cncf.kubernetes.pod_generator import (
-    PodDefaultsDeprecated,
     PodGenerator,
     datetime_to_label_safe_datestring,
     extend_object_field,
@@ -160,41 +160,6 @@ class TestPodGenerator:
             ),
         )
 
-    
@mock.patch("airflow.providers.cncf.kubernetes.kubernetes_helper_functions.rand_str")
-    def test_gen_pod_extract_xcom(self, mock_rand_str, data_file):
-        """
-        Method gen_pod is used nowhere in codebase and is deprecated.
-        This test is only retained for backcompat.
-        """
-        mock_rand_str.return_value = self.rand_str
-        template_file = 
data_file("pods/generator_base_with_secrets.yaml").as_posix()
-
-        pod_generator = PodGenerator(pod_template_file=template_file, 
extract_xcom=True)
-        with pytest.warns(AirflowProviderDeprecationWarning):
-            result = pod_generator.gen_pod()
-        container_two = {
-            "name": "airflow-xcom-sidecar",
-            "image": "alpine",
-            "command": ["sh", "-c", PodDefaultsDeprecated.XCOM_CMD],
-            "volumeMounts": [{"name": "xcom", "mountPath": "/airflow/xcom"}],
-            "resources": {"requests": {"cpu": "1m"}},
-        }
-        self.expected.spec.containers.append(container_two)
-        base_container: k8s.V1Container = self.expected.spec.containers[0]
-        base_container.volume_mounts = base_container.volume_mounts or []
-        base_container.volume_mounts.append(k8s.V1VolumeMount(name="xcom", 
mount_path="/airflow/xcom"))
-        self.expected.spec.containers[0] = base_container
-        self.expected.spec.volumes = self.expected.spec.volumes or []
-        self.expected.spec.volumes.append(
-            k8s.V1Volume(
-                name="xcom",
-                empty_dir={},
-            )
-        )
-        result_dict = self.k8s_client.sanitize_for_serialization(result)
-        expected_dict = 
self.k8s_client.sanitize_for_serialization(self.expected)
-        assert result_dict == expected_dict
-
     def test_from_obj_pod_override_object(self):
         obj = {
             "pod_override": k8s.V1Pod(
@@ -240,54 +205,6 @@ class TestPodGenerator:
             },
         }
 
-    def test_from_obj_legacy(self):
-        obj = {
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
-            }
-        }
-        with pytest.warns(
-            AirflowProviderDeprecationWarning,
-            match="Using a dictionary for the executor_config is deprecated 
and will soon be removed",
-        ):
-            result = PodGenerator.from_obj(obj)
-
-        assert self.k8s_client.sanitize_for_serialization(result) == {
-            "apiVersion": "v1",
-            "kind": "Pod",
-            "metadata": {
-                "annotations": {"test": "annotation"},
-            },
-            "spec": {
-                "containers": [
-                    {
-                        "args": [],
-                        "command": [],
-                        "env": [],
-                        "envFrom": [],
-                        "name": "base",
-                        "ports": [],
-                        "volumeMounts": [{"mountPath": "/foo/", "name": 
"example-kubernetes-test-volume"}],
-                    }
-                ],
-                "hostNetwork": False,
-                "imagePullSecrets": [],
-                "volumes": [{"hostPath": {"path": "/tmp/"}, "name": 
"example-kubernetes-test-volume"}],
-            },
-        }
-
     def test_from_obj_both(self):
         obj = {
             "pod_override": k8s.V1Pod(
@@ -725,16 +642,13 @@ class TestPodGenerator:
         ),
     )
     def test_pod_name_confirm_to_max_length(self, input):
-        with pytest.warns(
-            AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in 
`kubernetes_helper_functions`"
-        ):
-            actual = PodGenerator.make_unique_pod_id(input)
-        assert len(actual) <= 100
+        actual = add_unique_suffix(name=input)
+        assert len(actual) <= 63
         actual_base, actual_suffix = actual.rsplit("-", maxsplit=1)
         # we limit pod id length to 100
         # random suffix is 8 chars plus the '-' separator
-        # so actual pod id base should first 91 chars of requested pod id
-        assert actual_base == input[:91]
+        # so actual pod id base should first 55 chars of requested pod id
+        assert actual_base == input[:54]
         # suffix should always be 8, the random alphanum
         assert re.match(r"^[a-z0-9]{8}$", actual_suffix)
 
@@ -743,7 +657,7 @@ class TestPodGenerator:
         (
             (
                 
"somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen-",
-                
"somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen",
+                "somewhat-long-pod-name-maybe-longer-than-previously-su",
             ),
             ("pod-name-with-hyphen-", "pod-name-with-hyphen"),
             ("pod-name-with-double-hyphen--", "pod-name-with-double-hyphen"),
@@ -759,10 +673,7 @@ class TestPodGenerator:
         `make_unique_pod_id` doesn't actually guarantee that the regex passes 
for any input.
         But I guess this test verifies that an otherwise valid pod_id doesn't 
get _screwed up_.
         """
-        with pytest.warns(
-            AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in 
`kubernetes_helper_functions`"
-        ):
-            actual = PodGenerator.make_unique_pod_id(pod_id)
+        actual = add_unique_suffix(name=pod_id)
         assert len(actual) <= 253
         assert actual == actual.lower(), "not lowercase"
         # verify using official k8s regex
diff --git a/providers/tests/cncf/kubernetes/triggers/test_pod.py 
b/providers/tests/cncf/kubernetes/triggers/test_pod.py
index 0e7522480a8..9b3a21d023f 100644
--- a/providers/tests/cncf/kubernetes/triggers/test_pod.py
+++ b/providers/tests/cncf/kubernetes/triggers/test_pod.py
@@ -108,7 +108,6 @@ class TestKubernetesPodTrigger:
             "startup_check_interval": 5,
             "trigger_start_time": TRIGGER_START_TIME,
             "on_finish_action": ON_FINISH_ACTION,
-            "should_delete_pod": ON_FINISH_ACTION == "delete_pod",
             "last_log_time": None,
             "logging_interval": None,
         }
diff --git a/providers/tests/cncf/kubernetes/utils/test_pod_manager.py 
b/providers/tests/cncf/kubernetes/utils/test_pod_manager.py
index b577ea969ea..d220befc219 100644
--- a/providers/tests/cncf/kubernetes/utils/test_pod_manager.py
+++ b/providers/tests/cncf/kubernetes/utils/test_pod_manager.py
@@ -49,12 +49,10 @@ if TYPE_CHECKING:
 
 class TestPodManager:
     def setup_method(self):
-        self.mock_progress_callback = mock.Mock()
         self.mock_kube_client = mock.Mock()
         self.pod_manager = PodManager(
             kube_client=self.mock_kube_client,
             callbacks=MockKubernetesPodOperatorCallback,
-            progress_callback=self.mock_progress_callback,
         )
 
     def test_read_pod_logs_successfully_returns_logs(self):
@@ -294,19 +292,6 @@ class TestPodManager:
 
         assert status.last_log_time == cast("DateTime", 
pendulum.parse(timestamp_string))
 
-    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
-    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
-    def test_fetch_container_logs_invoke_deprecated_progress_callback(
-        self, mock_read_pod_logs, mock_container_is_running
-    ):
-        message = "2020-10-08T14:16:17.793417674Z message"
-        no_ts_message = "notimestamp"
-        mock_read_pod_logs.return_value = [bytes(message, "utf-8"), 
bytes(no_ts_message, "utf-8")]
-        mock_container_is_running.return_value = False
-
-        self.pod_manager.fetch_container_logs(mock.MagicMock(), 
mock.MagicMock(), follow=True)
-        self.mock_progress_callback.assert_has_calls([mock.call(message), 
mock.call(no_ts_message)])
-
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
     def test_fetch_container_logs_invoke_progress_callback(
@@ -352,7 +337,6 @@ class TestPodManager:
             mock_container_is_running.side_effect = [True, True, False]
             status = self.pod_manager.fetch_container_logs(mock.MagicMock(), 
mock.MagicMock(), follow=True)
         assert status.last_log_time == cast("DateTime", 
pendulum.parse(last_timestamp_string))
-        assert self.mock_progress_callback.call_count == expected_call_count
         assert mock_callbacks.progress_callback.call_count == 
expected_call_count
 
     
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
diff --git a/providers/tests/google/cloud/triggers/test_kubernetes_engine.py 
b/providers/tests/google/cloud/triggers/test_kubernetes_engine.py
index 844e2f96332..076ce183710 100644
--- a/providers/tests/google/cloud/triggers/test_kubernetes_engine.py
+++ b/providers/tests/google/cloud/triggers/test_kubernetes_engine.py
@@ -27,7 +27,13 @@ import pytest
 from google.cloud.container_v1.types import Operation
 from kubernetes.client import models as k8s
 
-from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import 
ContainerState
+try:
+    from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState
+except ImportError:
+    # preserve backward compatibility for older versions of cncf.kubernetes 
provider, remove this when minimum cncf.kubernetes provider is 10.0
+    from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import (  # 
type: ignore[no-redef]
+        ContainerState,
+    )
 from airflow.providers.google.cloud.triggers.kubernetes_engine import (
     GKEJobTrigger,
     GKEOperationTrigger,
diff --git a/scripts/in_container/run_provider_yaml_files_check.py 
b/scripts/in_container/run_provider_yaml_files_check.py
index ab5ebcac697..7d5756b2e7e 100755
--- a/scripts/in_container/run_provider_yaml_files_check.py
+++ b/scripts/in_container/run_provider_yaml_files_check.py
@@ -48,8 +48,6 @@ from airflow.providers_manager import ProvidersManager
 DEPRECATED_MODULES = [
     "airflow.providers.apache.hdfs.sensors.hdfs",
     "airflow.providers.apache.hdfs.hooks.hdfs",
-    "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod",
-    "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
     "airflow.providers.tabular.hooks.tabular",
     "airflow.providers.yandex.hooks.yandexcloud_dataproc",
     "airflow.providers.yandex.operators.yandexcloud_dataproc",
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index 683fa7ce797..78534accd18 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -555,10 +555,6 @@ class 
TestElasticsearchProviderProjectStructure(ExampleCoverageTest):
 class TestCncfProviderProjectStructure(ExampleCoverageTest):
     PROVIDER = "cncf"
     CLASS_DIRS = ProjectStructureTest.CLASS_DIRS
-    DEPRECATED_CLASSES = {
-        "airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
-        "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod",
-    }
     BASE_CLASSES = 
{"airflow.providers.cncf.kubernetes.operators.resource.KubernetesResourceBaseOperator"}
 
 

Reply via email to