This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3169e273b6f Update Kubernetes provider for Airflow 3.0 compatibility 
(#52664)
3169e273b6f is described below

commit 3169e273b6fea1e38fd8ce75bf4e72d319db10df
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jul 2 01:45:09 2025 +0530

    Update Kubernetes provider for Airflow 3.0 compatibility (#52664)
    
    Part of #52378
---
 .../cncf/kubernetes/decorators/kubernetes.py       | 16 ++++---------
 .../cncf/kubernetes/decorators/kubernetes_cmd.py   | 15 ++++--------
 .../providers/cncf/kubernetes/operators/job.py     |  2 +-
 .../providers/cncf/kubernetes/operators/kueue.py   |  2 +-
 .../providers/cncf/kubernetes/operators/pod.py     |  2 +-
 .../cncf/kubernetes/operators/resource.py          |  2 +-
 .../cncf/kubernetes/sensors/spark_kubernetes.py    |  7 +-----
 .../providers/cncf/kubernetes/version_compat.py    | 27 ++++++++++++++++++++++
 8 files changed, 42 insertions(+), 31 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
index eb4752c0ffb..eb86ac9fb50 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
@@ -27,21 +27,15 @@ from typing import TYPE_CHECKING
 import dill
 from kubernetes.client import models as k8s
 
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, 
task_decorator_factory
-else:
-    from airflow.decorators.base import (  # type: ignore[no-redef]
-        DecoratedOperator,
-        TaskDecorator,
-        task_decorator_factory,
-    )
-
 from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
 from airflow.providers.cncf.kubernetes.python_kubernetes_script import (
     write_python_script,
 )
+from airflow.providers.cncf.kubernetes.version_compat import (
+    DecoratedOperator,
+    TaskDecorator,
+    task_decorator_factory,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
index 78e89d44cde..91d541ddf8e 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
@@ -20,17 +20,12 @@ import warnings
 from collections.abc import Callable, Sequence
 from typing import TYPE_CHECKING
 
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, 
task_decorator_factory
-else:
-    from airflow.decorators.base import (  # type: ignore[no-redef]
-        DecoratedOperator,
-        TaskDecorator,
-        task_decorator_factory,
-    )
 from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
+from airflow.providers.cncf.kubernetes.version_compat import (
+    DecoratedOperator,
+    TaskDecorator,
+    task_decorator_factory,
+)
 from airflow.utils.context import context_merge
 from airflow.utils.operator_helpers import determine_kwargs
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 31ebbb752fc..8e7ef694b9c 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -32,7 +32,6 @@ from kubernetes.client.rest import ApiException
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
     add_unique_suffix,
@@ -42,6 +41,7 @@ from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperato
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, 
merge_objects
 from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
 from airflow.providers.cncf.kubernetes.utils.pod_manager import 
EMPTY_XCOM_RESULT, PodNotFoundException
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
 from airflow.utils import yaml
 from airflow.utils.context import Context
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
index dba42c7d239..e0c7c549e90 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
@@ -25,9 +25,9 @@ from functools import cached_property
 from kubernetes.utils import FailToCreateError
 
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 from airflow.providers.cncf.kubernetes.operators.job import 
KubernetesJobOperator
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
 
 
 class KubernetesInstallKueueOperator(BaseOperator):
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index e01c5f43783..a46713ef3f3 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -46,7 +46,6 @@ from airflow.exceptions import (
     AirflowSkipException,
     TaskDeferred,
 )
-from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes import pod_generator
 from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters 
import (
     convert_affinity,
@@ -81,6 +80,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import (
     container_is_succeeded,
     get_container_termination_message,
 )
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
 from airflow.settings import pod_mutation_hook
 from airflow.utils import yaml
 from airflow.utils.helpers import prune_dict, validate_key
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
index cfedf84e709..e095bd7802d 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
@@ -28,11 +28,11 @@ import yaml
 from kubernetes.utils import create_from_yaml
 
 from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
should_retry_creation
 from airflow.providers.cncf.kubernetes.utils.delete_from import 
delete_from_yaml
 from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import 
k8s_resource_iterator
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
 
 if TYPE_CHECKING:
     from kubernetes.client import ApiClient, CustomObjectsApi
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 50e713e134a..48cb809a47c 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -25,12 +25,7 @@ from kubernetes import client
 
 from airflow.exceptions import AirflowException
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.sdk import BaseSensorOperator
-else:
-    from airflow.sensors.base import BaseSensorOperator  # type: 
ignore[no-redef]
+from airflow.providers.cncf.kubernetes.version_compat import BaseSensorOperator
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
index 48d122b6696..1815e33c611 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
@@ -33,3 +33,30 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
+
+if AIRFLOW_V_3_1_PLUS:
+    from airflow.sdk import BaseOperator
+else:
+    from airflow.models import BaseOperator  # type: ignore[no-redef]
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import BaseSensorOperator
+    from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, 
task_decorator_factory
+else:
+    from airflow.decorators.base import (  # type: ignore[no-redef]
+        DecoratedOperator,
+        TaskDecorator,
+        task_decorator_factory,
+    )
+    from airflow.sensors.base import BaseSensorOperator  # type: 
ignore[no-redef]
+
+__all__ = [
+    "AIRFLOW_V_3_0_PLUS",
+    "AIRFLOW_V_3_1_PLUS",
+    "BaseOperator",
+    "BaseSensorOperator",
+    "DecoratedOperator",
+    "TaskDecorator",
+    "task_decorator_factory",
+]

Reply via email to