This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3169e273b6f Update Kubernetes provider for Airflow 3.0 compatibility
(#52664)
3169e273b6f is described below
commit 3169e273b6fea1e38fd8ce75bf4e72d319db10df
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jul 2 01:45:09 2025 +0530
Update Kubernetes provider for Airflow 3.0 compatibility (#52664)
Part of #52378
---
.../cncf/kubernetes/decorators/kubernetes.py | 16 ++++---------
.../cncf/kubernetes/decorators/kubernetes_cmd.py | 15 ++++--------
.../providers/cncf/kubernetes/operators/job.py | 2 +-
.../providers/cncf/kubernetes/operators/kueue.py | 2 +-
.../providers/cncf/kubernetes/operators/pod.py | 2 +-
.../cncf/kubernetes/operators/resource.py | 2 +-
.../cncf/kubernetes/sensors/spark_kubernetes.py | 7 +-----
.../providers/cncf/kubernetes/version_compat.py | 27 ++++++++++++++++++++++
8 files changed, 42 insertions(+), 31 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
index eb4752c0ffb..eb86ac9fb50 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
@@ -27,21 +27,15 @@ from typing import TYPE_CHECKING
import dill
from kubernetes.client import models as k8s
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator,
task_decorator_factory
-else:
- from airflow.decorators.base import ( # type: ignore[no-redef]
- DecoratedOperator,
- TaskDecorator,
- task_decorator_factory,
- )
-
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
from airflow.providers.cncf.kubernetes.python_kubernetes_script import (
write_python_script,
)
+from airflow.providers.cncf.kubernetes.version_compat import (
+ DecoratedOperator,
+ TaskDecorator,
+ task_decorator_factory,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
index 78e89d44cde..91d541ddf8e 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
@@ -20,17 +20,12 @@ import warnings
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator,
task_decorator_factory
-else:
- from airflow.decorators.base import ( # type: ignore[no-redef]
- DecoratedOperator,
- TaskDecorator,
- task_decorator_factory,
- )
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
+from airflow.providers.cncf.kubernetes.version_compat import (
+ DecoratedOperator,
+ TaskDecorator,
+ task_decorator_factory,
+)
from airflow.utils.context import context_merge
from airflow.utils.operator_helpers import determine_kwargs
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 31ebbb752fc..8e7ef694b9c 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -32,7 +32,6 @@ from kubernetes.client.rest import ApiException
from airflow.configuration import conf
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
add_unique_suffix,
@@ -42,6 +41,7 @@ from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperato
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator,
merge_objects
from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import
EMPTY_XCOM_RESULT, PodNotFoundException
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
from airflow.utils import yaml
from airflow.utils.context import Context
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
index dba42c7d239..e0c7c549e90 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py
@@ -25,9 +25,9 @@ from functools import cached_property
from kubernetes.utils import FailToCreateError
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.operators.job import
KubernetesJobOperator
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
class KubernetesInstallKueueOperator(BaseOperator):
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index e01c5f43783..a46713ef3f3 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -46,7 +46,6 @@ from airflow.exceptions import (
AirflowSkipException,
TaskDeferred,
)
-from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters
import (
convert_affinity,
@@ -81,6 +80,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager
import (
container_is_succeeded,
get_container_termination_message,
)
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
from airflow.settings import pod_mutation_hook
from airflow.utils import yaml
from airflow.utils.helpers import prune_dict, validate_key
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
index cfedf84e709..e095bd7802d 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py
@@ -28,11 +28,11 @@ import yaml
from kubernetes.utils import create_from_yaml
from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
should_retry_creation
from airflow.providers.cncf.kubernetes.utils.delete_from import
delete_from_yaml
from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import
k8s_resource_iterator
+from airflow.providers.cncf.kubernetes.version_compat import BaseOperator
if TYPE_CHECKING:
from kubernetes.client import ApiClient, CustomObjectsApi
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 50e713e134a..48cb809a47c 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -25,12 +25,7 @@ from kubernetes import client
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
-from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
- from airflow.sdk import BaseSensorOperator
-else:
- from airflow.sensors.base import BaseSensorOperator # type:
ignore[no-redef]
+from airflow.providers.cncf.kubernetes.version_compat import BaseSensorOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
index 48d122b6696..1815e33c611 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
@@ -33,3 +33,30 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
+AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
+
+if AIRFLOW_V_3_1_PLUS:
+ from airflow.sdk import BaseOperator
+else:
+ from airflow.models import BaseOperator # type: ignore[no-redef]
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk import BaseSensorOperator
+ from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator,
task_decorator_factory
+else:
+ from airflow.decorators.base import ( # type: ignore[no-redef]
+ DecoratedOperator,
+ TaskDecorator,
+ task_decorator_factory,
+ )
+ from airflow.sensors.base import BaseSensorOperator # type:
ignore[no-redef]
+
+__all__ = [
+ "AIRFLOW_V_3_0_PLUS",
+ "AIRFLOW_V_3_1_PLUS",
+ "BaseOperator",
+ "BaseSensorOperator",
+ "DecoratedOperator",
+ "TaskDecorator",
+ "task_decorator_factory",
+]