jedcunningham commented on code in PR #32767:
URL: https://github.com/apache/airflow/pull/32767#discussion_r1273852565
##########
airflow/utils/deprecation_tools.py:
##########
@@ -23,25 +23,70 @@
from types import ModuleType
-def getattr_with_deprecation(imports: dict[str, str], module: str, name: str):
+def getattr_with_deprecation(
+ imports: dict[str, str],
+ module: str,
+ override_deprecated_classes: dict[str, str],
+ extra_message: str,
+ name: str,
+):
+ """
+ Retrieve the imported attribute from the redirected module ad raises a
deprecation warning.
+
+ :param imports: dict of imports and their redirection for the module
+ :param module: module name
+ :param override_deprecated_classes: override target classes with
deprecated ones. If target class is
+ found in the dictionary, it will be displayed in the warning message.
+ :param name: attribute name
Review Comment:
Missing extra_message.
```suggestion
:param extra_message: extra message to display in the warning or import
error message
:param name: attribute name
```
##########
airflow/utils/deprecation_tools.py:
##########
@@ -23,25 +23,70 @@
from types import ModuleType
-def getattr_with_deprecation(imports: dict[str, str], module: str, name: str):
+def getattr_with_deprecation(
+ imports: dict[str, str],
+ module: str,
+ override_deprecated_classes: dict[str, str],
+ extra_message: str,
+ name: str,
+):
+ """
+ Retrieve the imported attribute from the redirected module ad raises a
deprecation warning.
Review Comment:
```suggestion
Retrieve the imported attribute from the redirected module and raise a
deprecation warning.
```
##########
airflow/kubernetes/pre_7_4_0_compatibility/__init__.py:
##########
@@ -15,24 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module is deprecated.
-Please use :mod:`kubernetes.client.models` for `V1ResourceRequirements` and
`Port`.
-"""
from __future__ import annotations
+# All the classes in this module should only be kept for
backwards-compatibility reasons.
+# old cncf.kubernetes providers will use those in their frozen version for
pre-7.4.0 release
import warnings
-from airflow.exceptions import RemovedInAirflow3Warning
-
-# flake8: noqa
-
-with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- from airflow.providers.cncf.kubernetes.backcompat.pod import Port,
Resources
-
warnings.warn(
- "This module is deprecated. Please use `kubernetes.client.models` for
`V1ResourceRequirements` and `Port`.",
- RemovedInAirflow3Warning,
+ "This module is deprecated. The `cncf.kubernetes` provider before version
7.4.0 use this module - "
+ "you should migrate to version of `cncf.kubernetes` to get rid of this
warning. If you "
+ "import the module via `airflow.kubernetes` import, please use
`cncf.kubernetes' "
+ "provider above 7.4.0+ and switch all your imports to use
`apache.airflow.providers.cncf.kubernetes` "
Review Comment:
```suggestion
"provider 7.4.0+ and switch all your imports to use
`apache.airflow.providers.cncf.kubernetes` "
```
Don't need the "+" and "above", either works.
##########
airflow/models/taskinstance.py:
##########
@@ -2290,32 +2277,51 @@ def render_templates(self, context: Context | None =
None) -> Operator:
return original_task
def render_k8s_pod_yaml(self) -> dict | None:
- """Render k8s pod yaml."""
- from kubernetes.client.api_client import ApiClient
-
- from airflow.kubernetes.kube_config import KubeConfig
- from airflow.kubernetes.kubernetes_helper_functions import
create_pod_id # Circular import
- from airflow.kubernetes.pod_generator import PodGenerator
+ """Render the k8s pod yaml."""
+ try:
+ from airflow.providers.cncf.kubernetes.template_rendering import (
+ render_k8s_pod_yaml as render_k8s_pod_yaml_from_provider,
+ )
+ except ImportError:
+ raise RuntimeError(
+ "You need to have 'cncf.kubernetes' provider installed to use
this feature. "
+ "Also rather than calling it directly you should import "
+ "render_k8s_pod_yaml from
airflow.providers.cncf.kubernetes.template_rendering "
+ "and call it with TaskInstance as the first argument."
+ )
+ warnings.warn(
+ "You should not call task_instance.render_k8s_pod_yaml directly.
This method will be removed"
+ "in Airflow 3. Rather than calling it directly you should import "
+ "render_k8s_pod_yaml from
airflow.providers.cncf.kubernetes.template_rendering "
+ "and call it with TaskInstance as the first argument.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return render_k8s_pod_yaml_from_provider(self)
- kube_config = KubeConfig()
- pod = PodGenerator.construct_pod(
- dag_id=self.dag_id,
- run_id=self.run_id,
- task_id=self.task_id,
- map_index=self.map_index,
- date=None,
- pod_id=create_pod_id(self.dag_id, self.task_id),
- try_number=self.try_number,
- kube_image=kube_config.kube_image,
- args=self.command_as_list(),
- pod_override_object=PodGenerator.from_obj(self.executor_config),
- scheduler_job_id="0",
- namespace=kube_config.executor_namespace,
-
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
- with_mutation_hook=True,
+ @provide_session
+ def get_rendered_k8s_spec(self, session: Session = NEW_SESSION):
+ """Render the k8s pod yaml."""
+ try:
+ from airflow.providers.cncf.kubernetes.template_rendering import (
+ get_rendered_k8s_spec as get_rendered_k8s_spec_from_provider,
+ )
+ except ImportError:
+ raise RuntimeError(
+ "You need to have 'cncf.kubernetes' provider installed to use
this feature. "
Review Comment:
```suggestion
"You need to have the 'cncf.kubernetes' provider installed
to use this feature. "
```
##########
airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -43,8 +43,14 @@ def rand_str(num):
return "".join(secrets.choice(alphanum_lower) for _ in range(num))
-def add_pod_suffix(*, pod_name, rand_len=8, max_len=80):
- """Add random string to pod name while staying under max len."""
+def add_pod_suffix(pod_name: str, rand_len: int = 8, max_len: int = 80) -> str:
Review Comment:
```suggestion
def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = 80)
-> str:
```
Why'd we drop the kwargs only behavior?
##########
tests/cli/commands/test_task_command.py:
##########
@@ -747,13 +749,16 @@ def
test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
"""
import subprocess
- with mock.patch.dict("os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s):
+ with mock.patch.dict(
+ "os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT)
+ ):
with subprocess.Popen(
- args=["airflow", *self.task_args, "-S", self.dag_path],
+ args=[sys.executable, "-m", "airflow", *self.task_args, "-S",
self.dag_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as process:
output, err = process.communicate()
+ print(output)
Review Comment:
```suggestion
```
Leftover debugging?
##########
docs/apache-airflow/core-concepts/executor/local_kubernetes.rst:
##########
@@ -21,7 +21,14 @@
LocalKubernetes Executor
=========================
-The
:class:`~airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor`
allows users
+.. note::
+
+ As of Airflow 2.7.0, you need to install ``kubernetes`` provider package
to use
Review Comment:
```suggestion
As of Airflow 2.7.0, you need to install the ``kubernetes`` provider
package to use
```
##########
airflow/utils/deprecation_tools.py:
##########
@@ -23,25 +23,70 @@
from types import ModuleType
-def getattr_with_deprecation(imports: dict[str, str], module: str, name: str):
+def getattr_with_deprecation(
+ imports: dict[str, str],
+ module: str,
+ override_deprecated_classes: dict[str, str],
+ extra_message: str,
+ name: str,
+):
+ """
+ Retrieve the imported attribute from the redirected module ad raises a
deprecation warning.
+
+ :param imports: dict of imports and their redirection for the module
+ :param module: module name
+ :param override_deprecated_classes: override target classes with
deprecated ones. If target class is
+ found in the dictionary, it will be displayed in the warning message.
+ :param name: attribute name
+ :return:
+ """
target_class_full_name = imports.get(name)
if not target_class_full_name:
raise AttributeError(f"The module `{module!r}` has no attribute
`{name!r}`")
+ warning_class_name = target_class_full_name
+ if override_deprecated_classes and name in override_deprecated_classes:
+ warning_class_name = override_deprecated_classes[name]
warnings.warn(
- f"The `{module}.{name}` class is deprecated. Please use
`{target_class_full_name!r}`.",
+ f"The `{module}.{name}` class is deprecated. Please use
`{warning_class_name!r}`.{extra_message}.",
DeprecationWarning,
stacklevel=2,
)
new_module, new_class_name = target_class_full_name.rsplit(".", 1)
- return getattr(importlib.import_module(new_module), new_class_name)
+ try:
+ return getattr(importlib.import_module(new_module), new_class_name)
+ except ImportError as e:
+ raise ImportError(
+ f"Could not import `{new_module}.{new_class_name}`"
+ f" while trying to import `{module}.{name}.`{extra_message}."
+ ) from e
+
+def add_deprecated_classes(
+ module_imports: dict[str, dict[str, str]],
+ package: str,
+ override_deprecated_classes: dict[str, dict[str, str]] | None = None,
+ extra_message: str | None = None,
+):
+ """
+ Add deprecated class PEP-563 imports and warnings modules to the package.
-def add_deprecated_classes(module_imports: dict[str, dict[str, str]], package:
str):
+ :param module_imports: imports to use
+ :param package: package name
+ :param override_deprecated_classes: override target classes with
deprecated ones. If module +
+ target class is found in the dictionary, it will be displayed in the
warning message.
+ :param extra_message: extra message to display in the warning or import
error message
+ """
for module_name, imports in module_imports.items():
full_module_name = f"{package}.{module_name}"
module_type = ModuleType(full_module_name)
# Mypy is not able to derive the right function signature
https://github.com/python/mypy/issues/2427
module_type.__getattr__ = functools.partial( # type:
ignore[assignment]
- getattr_with_deprecation, imports, full_module_name
+ getattr_with_deprecation,
+ imports,
+ full_module_name,
+ override_deprecated_classes[module_name]
+ if override_deprecated_classes and module_name in
override_deprecated_classes
+ else dict(),
Review Comment:
Might be a little cleaner to do this out of the partial call.
##########
airflow/models/taskinstance.py:
##########
@@ -2290,32 +2277,51 @@ def render_templates(self, context: Context | None =
None) -> Operator:
return original_task
def render_k8s_pod_yaml(self) -> dict | None:
- """Render k8s pod yaml."""
- from kubernetes.client.api_client import ApiClient
-
- from airflow.kubernetes.kube_config import KubeConfig
- from airflow.kubernetes.kubernetes_helper_functions import
create_pod_id # Circular import
- from airflow.kubernetes.pod_generator import PodGenerator
+ """Render the k8s pod yaml."""
+ try:
+ from airflow.providers.cncf.kubernetes.template_rendering import (
+ render_k8s_pod_yaml as render_k8s_pod_yaml_from_provider,
+ )
+ except ImportError:
+ raise RuntimeError(
+ "You need to have 'cncf.kubernetes' provider installed to use
this feature. "
Review Comment:
```suggestion
"You need to have the 'cncf.kubernetes' provider installed
to use this feature. "
```
##########
airflow/kubernetes/pre_7_4_0_compatibility/__init__.py:
##########
@@ -15,24 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module is deprecated.
-Please use :mod:`kubernetes.client.models` for `V1ResourceRequirements` and
`Port`.
-"""
from __future__ import annotations
+# All the classes in this module should only be kept for
backwards-compatibility reasons.
+# old cncf.kubernetes providers will use those in their frozen version for
pre-7.4.0 release
import warnings
-from airflow.exceptions import RemovedInAirflow3Warning
-
-# flake8: noqa
-
-with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- from airflow.providers.cncf.kubernetes.backcompat.pod import Port,
Resources
-
warnings.warn(
- "This module is deprecated. Please use `kubernetes.client.models` for
`V1ResourceRequirements` and `Port`.",
- RemovedInAirflow3Warning,
+ "This module is deprecated. The `cncf.kubernetes` provider before version
7.4.0 use this module - "
+ "you should migrate to version of `cncf.kubernetes` to get rid of this
warning. If you "
Review Comment:
```suggestion
"This module is deprecated. The `cncf.kubernetes` provider before
version 7.4.0 uses this module - "
"you should migrate to a newer version of `cncf.kubernetes` to get rid
of this warning. If you "
```
##########
airflow/config_templates/__init__.py:
##########
@@ -25,4 +25,6 @@
},
}
-add_deprecated_classes(__deprecated_classes, __name__)
+add_deprecated_classes(
+ __deprecated_classes, __name__, {}, " The `celery` provider must be >=
3.3.0 for that."
Review Comment:
Feels a little weird requiring the caller to know it needs to prepend a
space. Can we add that in the function instead?
##########
airflow/providers/cncf/kubernetes/provider.yaml:
##########
@@ -124,3 +125,212 @@ connection-types:
task-decorators:
- class-name:
airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task
name: kubernetes
+
+config:
+ local_kubernetes_executor:
+ description: |
+ This section only applies if you are using the
``LocalKubernetesExecutor`` in
+ ``[core]`` section above
+ options:
+ kubernetes_queue:
+ description: |
+ Define when to send a task to ``KubernetesExecutor`` when using
``LocalKubernetesExecutor``.
+ When the queue of a task is the value of ``kubernetes_queue``
(default ``kubernetes``),
+ the task is executed via ``KubernetesExecutor``,
+ otherwise via ``LocalExecutor``
+ version_added: ~
+ type: string
+ example: ~
+ default: "kubernetes"
+ kubernetes_executor:
+ description: ~
+ options:
+ api_client_retry_configuration:
+ description: |
+ Kwargs to override the default urllib3 Retry used in the kubernetes
API client
+ version_added: ~
+ type: string
+ example: '{ "total": 3, "backoff_factor": 0.5 }'
+ default: ""
+ logs_task_metadata:
+ description: |
+ Flag to control the information added to kubernetes executor logs
for better traceability
+ version_added: 7.4.0
Review Comment:
This feels a little weird to me. I know this was just added in core, but all
of these were just added into the provider. `api_client_retry_configuration`
above is a good example. That was just added in 2.6.0 itself.
I wonder if a "fresh clean slate" is better?
##########
docs/apache-airflow/core-concepts/executor/kubernetes.rst:
##########
@@ -21,6 +21,14 @@
Kubernetes Executor
===================
+.. note::
+
+ As of Airflow 2.7.0, you need to install ``kubernetes`` provider package
to use
Review Comment:
```suggestion
As of Airflow 2.7.0, you need to install the ``kubernetes`` provider
package to use
```
##########
docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst:
##########
@@ -21,6 +21,14 @@
CeleryKubernetes Executor
=========================
+.. note::
+
+ As of Airflow 2.7.0, you need to install both ``celery`` and
``kubernetes`` provider package to use
Review Comment:
```suggestion
As of Airflow 2.7.0, you need to install both the ``celery`` and
``kubernetes`` provider packages to use
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]