This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e3897dcbed Remove compat code for 2.7.0 - its now the min Airflow
version (#39591)
e3897dcbed is described below
commit e3897dcbed0262b0cab7a357f8d7fbbb6c4f4eeb
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon May 13 16:01:44 2024 -0600
Remove compat code for 2.7.0 - its now the min Airflow version (#39591)
---
.../providers/celery/executors/celery_executor.py | 44 ++++++---------------
.../kubernetes/executors/kubernetes_executor.py | 46 +++++++---------------
.../executors/kubernetes_executor_utils.py | 19 +--------
3 files changed, 29 insertions(+), 80 deletions(-)
diff --git a/airflow/providers/celery/executors/celery_executor.py
b/airflow/providers/celery/executors/celery_executor.py
index d81e57a222..9f1948db52 100644
--- a/airflow/providers/celery/executors/celery_executor.py
+++ b/airflow/providers/celery/executors/celery_executor.py
@@ -37,37 +37,19 @@ from celery import states as celery_states
from packaging.version import Version
from airflow import __version__ as airflow_version
-
-try:
- from airflow.cli.cli_config import (
- ARG_DAEMON,
- ARG_LOG_FILE,
- ARG_PID,
- ARG_SKIP_SERVE_LOGS,
- ARG_STDERR,
- ARG_STDOUT,
- ARG_VERBOSE,
- ActionCommand,
- Arg,
- GroupCommand,
- lazy_load_command,
- )
-except ImportError:
- import packaging.version
-
- from airflow.exceptions import AirflowOptionalProviderFeatureException
-
- base_version = packaging.version.parse(airflow_version).base_version
-
- if packaging.version.parse(base_version) <
packaging.version.parse("2.7.0"):
- raise AirflowOptionalProviderFeatureException(
- "Celery Executor from Celery Provider should only be used with
Airflow 2.7.0+.\n"
- f"This is Airflow {airflow_version} and Celery and
CeleryKubernetesExecutor are "
- f"available in the 'airflow.executors' package. You should not use
"
- f"the provider's executors in this version of Airflow."
- )
- raise
-
+from airflow.cli.cli_config import (
+ ARG_DAEMON,
+ ARG_LOG_FILE,
+ ARG_PID,
+ ARG_SKIP_SERVE_LOGS,
+ ARG_STDERR,
+ ARG_STDOUT,
+ ARG_VERBOSE,
+ ActionCommand,
+ Arg,
+ GroupCommand,
+ lazy_load_command,
+)
from airflow.configuration import conf
from airflow.exceptions import AirflowTaskTimeout
from airflow.executors.base_executor import BaseExecutor
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index f62f021fd2..43cefeb9c4 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -38,38 +38,18 @@ from typing import TYPE_CHECKING, Any, Sequence
from kubernetes.dynamic import DynamicClient
from sqlalchemy import select, update
-from airflow.providers.cncf.kubernetes.pod_generator import
PodMutationHookException, PodReconciliationError
-from airflow.stats import Stats
-
-try:
- from airflow.cli.cli_config import (
- ARG_DAG_ID,
- ARG_EXECUTION_DATE,
- ARG_OUTPUT_PATH,
- ARG_SUBDIR,
- ARG_VERBOSE,
- ActionCommand,
- Arg,
- GroupCommand,
- lazy_load_command,
- positive_int,
- )
-except ImportError:
- import packaging.version
-
- from airflow import __version__ as airflow_version
- from airflow.exceptions import AirflowOptionalProviderFeatureException
-
- base_version = packaging.version.parse(airflow_version).base_version
-
- if packaging.version.parse(base_version) <
packaging.version.parse("2.7.0"):
- raise AirflowOptionalProviderFeatureException(
- "Kubernetes Executor from CNCF Provider should only be used with
Airflow 2.7.0+.\n"
- f"This is Airflow {airflow_version} and Kubernetes and
CeleryKubernetesExecutor are "
- f"available in the 'airflow.executors' package. You should not use
"
- f"the provider's executors in this version of Airflow."
- )
- raise
+from airflow.cli.cli_config import (
+ ARG_DAG_ID,
+ ARG_EXECUTION_DATE,
+ ARG_OUTPUT_PATH,
+ ARG_SUBDIR,
+ ARG_VERBOSE,
+ ActionCommand,
+ Arg,
+ GroupCommand,
+ lazy_load_command,
+ positive_int,
+)
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types
import (
@@ -78,6 +58,8 @@ from
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types impor
)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
annotations_to_key
+from airflow.providers.cncf.kubernetes.pod_generator import
PodMutationHookException, PodReconciliationError
+from airflow.stats import Stats
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
diff --git
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 3f544fe2fe..d26df876ef 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -21,7 +21,7 @@ import json
import multiprocessing
import time
from queue import Empty, Queue
-from typing import TYPE_CHECKING, Any, Generic, TypeVar
+from typing import TYPE_CHECKING, Any
from kubernetes import client, watch
from kubernetes.client.rest import ApiException
@@ -36,6 +36,7 @@ from
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.singleton import Singleton
from airflow.utils.state import TaskInstanceState
try:
@@ -60,22 +61,6 @@ if TYPE_CHECKING:
KubernetesWatchType,
)
-# Singleton here is duplicated version of airflow.utils.singleton.Singleton
until
-# min-airflow version is 2.7.0 for the provider. then it can be imported from
airflow.utils.singleton.
-
-T = TypeVar("T")
-
-
-class Singleton(type, Generic[T]):
- """Metaclass that allows to implement singleton pattern."""
-
- _instances: dict[Singleton[T], T] = {}
-
- def __call__(cls: Singleton[T], *args, **kwargs) -> T:
- if cls not in cls._instances:
- cls._instances[cls] = super().__call__(*args, **kwargs)
- return cls._instances[cls]
-
class ResourceVersion(metaclass=Singleton):
"""Singleton for tracking resourceVersion from Kubernetes."""