jedcunningham commented on code in PR #32767:
URL: https://github.com/apache/airflow/pull/32767#discussion_r1273852565


##########
airflow/utils/deprecation_tools.py:
##########
@@ -23,25 +23,70 @@
 from types import ModuleType
 
 
-def getattr_with_deprecation(imports: dict[str, str], module: str, name: str):
+def getattr_with_deprecation(
+    imports: dict[str, str],
+    module: str,
+    override_deprecated_classes: dict[str, str],
+    extra_message: str,
+    name: str,
+):
+    """
+    Retrieve the imported attribute from the redirected module ad raises a 
deprecation warning.
+
+    :param imports: dict of imports and their redirection for the module
+    :param module: module name
+    :param override_deprecated_classes: override target classes with 
deprecated ones. If target class is
+       found in the dictionary, it will be displayed in the warning message.
+    :param name: attribute name

Review Comment:
   Missing extra_message.
   
   ```suggestion
       :param extra_message: extra message to display in the warning or import 
error message
       :param name: attribute name
   ```



##########
airflow/utils/deprecation_tools.py:
##########
@@ -23,25 +23,70 @@
 from types import ModuleType
 
 
-def getattr_with_deprecation(imports: dict[str, str], module: str, name: str):
+def getattr_with_deprecation(
+    imports: dict[str, str],
+    module: str,
+    override_deprecated_classes: dict[str, str],
+    extra_message: str,
+    name: str,
+):
+    """
+    Retrieve the imported attribute from the redirected module ad raises a 
deprecation warning.

Review Comment:
   ```suggestion
       Retrieve the imported attribute from the redirected module and raise a 
deprecation warning.
   ```



##########
airflow/kubernetes/pre_7_4_0_compatibility/__init__.py:
##########
@@ -15,24 +14,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-This module is deprecated.
-Please use :mod:`kubernetes.client.models` for `V1ResourceRequirements` and 
`Port`.
-"""
 from __future__ import annotations
 
+# All the classes in this module should only be kept for 
backwards-compatibility reasons.
+# old cncf.kubernetes providers will use those in their frozen version for 
pre-7.4.0 release
 import warnings
 
-from airflow.exceptions import RemovedInAirflow3Warning
-
-# flake8: noqa
-
-with warnings.catch_warnings():
-    warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-    from airflow.providers.cncf.kubernetes.backcompat.pod import Port, 
Resources
-
 warnings.warn(
-    "This module is deprecated. Please use `kubernetes.client.models` for 
`V1ResourceRequirements` and `Port`.",
-    RemovedInAirflow3Warning,
+    "This module is deprecated. The `cncf.kubernetes` provider before version 
7.4.0 use this module - "
+    "you should migrate to version of `cncf.kubernetes` to get rid of this 
warning. If you "
+    "import the module via `airflow.kubernetes` import, please use 
`cncf.kubernetes' "
+    "provider above 7.4.0+ and switch all your imports to use 
`apache.airflow.providers.cncf.kubernetes` "

Review Comment:
   ```suggestion
       "provider 7.4.0+ and switch all your imports to use 
`apache.airflow.providers.cncf.kubernetes` "
   ```
   
   Don't need the "+" and "above", either works.



##########
airflow/models/taskinstance.py:
##########
@@ -2290,32 +2277,51 @@ def render_templates(self, context: Context | None = 
None) -> Operator:
         return original_task
 
     def render_k8s_pod_yaml(self) -> dict | None:
-        """Render k8s pod yaml."""
-        from kubernetes.client.api_client import ApiClient
-
-        from airflow.kubernetes.kube_config import KubeConfig
-        from airflow.kubernetes.kubernetes_helper_functions import 
create_pod_id  # Circular import
-        from airflow.kubernetes.pod_generator import PodGenerator
+        """Render the k8s pod yaml."""
+        try:
+            from airflow.providers.cncf.kubernetes.template_rendering import (
+                render_k8s_pod_yaml as render_k8s_pod_yaml_from_provider,
+            )
+        except ImportError:
+            raise RuntimeError(
+                "You need to have 'cncf.kubernetes' provider installed to use 
this feature. "
+                "Also rather than calling it directly you should import "
+                "render_k8s_pod_yaml from 
airflow.providers.cncf.kubernetes.template_rendering "
+                "and call it with TaskInstance as the first argument."
+            )
+        warnings.warn(
+            "You should not call task_instance.render_k8s_pod_yaml directly. 
This method will be removed"
+            "in Airflow 3. Rather than calling it directly you should import "
+            "render_k8s_pod_yaml from 
airflow.providers.cncf.kubernetes.template_rendering "
+            "and call it with TaskInstance as the first argument.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return render_k8s_pod_yaml_from_provider(self)
 
-        kube_config = KubeConfig()
-        pod = PodGenerator.construct_pod(
-            dag_id=self.dag_id,
-            run_id=self.run_id,
-            task_id=self.task_id,
-            map_index=self.map_index,
-            date=None,
-            pod_id=create_pod_id(self.dag_id, self.task_id),
-            try_number=self.try_number,
-            kube_image=kube_config.kube_image,
-            args=self.command_as_list(),
-            pod_override_object=PodGenerator.from_obj(self.executor_config),
-            scheduler_job_id="0",
-            namespace=kube_config.executor_namespace,
-            
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
-            with_mutation_hook=True,
+    @provide_session
+    def get_rendered_k8s_spec(self, session: Session = NEW_SESSION):
+        """Render the k8s pod yaml."""
+        try:
+            from airflow.providers.cncf.kubernetes.template_rendering import (
+                get_rendered_k8s_spec as get_rendered_k8s_spec_from_provider,
+            )
+        except ImportError:
+            raise RuntimeError(
+                "You need to have 'cncf.kubernetes' provider installed to use 
this feature. "

Review Comment:
   ```suggestion
                   "You need to have the 'cncf.kubernetes' provider installed 
to use this feature. "
   ```



##########
airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -43,8 +43,14 @@ def rand_str(num):
     return "".join(secrets.choice(alphanum_lower) for _ in range(num))
 
 
-def add_pod_suffix(*, pod_name, rand_len=8, max_len=80):
-    """Add random string to pod name while staying under max len."""
+def add_pod_suffix(pod_name: str, rand_len: int = 8, max_len: int = 80) -> str:

Review Comment:
   ```suggestion
   def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = 80) 
-> str:
   ```
   
   Why'd we drop the kwargs only behavior?



##########
tests/cli/commands/test_task_command.py:
##########
@@ -747,13 +749,16 @@ def 
test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
         """
         import subprocess
 
-        with mock.patch.dict("os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s):
+        with mock.patch.dict(
+            "os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s, 
PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT)
+        ):
             with subprocess.Popen(
-                args=["airflow", *self.task_args, "-S", self.dag_path],
+                args=[sys.executable, "-m", "airflow", *self.task_args, "-S", 
self.dag_path],
                 stdout=subprocess.PIPE,
                 stderr=subprocess.PIPE,
             ) as process:
                 output, err = process.communicate()
+        print(output)

Review Comment:
   ```suggestion
   ```
   
   Leftover debugging?



##########
docs/apache-airflow/core-concepts/executor/local_kubernetes.rst:
##########
@@ -21,7 +21,14 @@
 LocalKubernetes Executor
 =========================
 
-The 
:class:`~airflow.executors.local_kubernetes_executor.LocalKubernetesExecutor` 
allows users
+.. note::
+
+    As of Airflow 2.7.0, you need to install ``kubernetes`` provider package 
to use

Review Comment:
   ```suggestion
       As of Airflow 2.7.0, you need to install the ``kubernetes`` provider 
package to use
   ```



##########
airflow/utils/deprecation_tools.py:
##########
@@ -23,25 +23,70 @@
 from types import ModuleType
 
 
-def getattr_with_deprecation(imports: dict[str, str], module: str, name: str):
+def getattr_with_deprecation(
+    imports: dict[str, str],
+    module: str,
+    override_deprecated_classes: dict[str, str],
+    extra_message: str,
+    name: str,
+):
+    """
+    Retrieve the imported attribute from the redirected module ad raises a 
deprecation warning.
+
+    :param imports: dict of imports and their redirection for the module
+    :param module: module name
+    :param override_deprecated_classes: override target classes with 
deprecated ones. If target class is
+       found in the dictionary, it will be displayed in the warning message.
+    :param name: attribute name
+    :return:
+    """
     target_class_full_name = imports.get(name)
     if not target_class_full_name:
         raise AttributeError(f"The module `{module!r}` has no attribute 
`{name!r}`")
+    warning_class_name = target_class_full_name
+    if override_deprecated_classes and name in override_deprecated_classes:
+        warning_class_name = override_deprecated_classes[name]
     warnings.warn(
-        f"The `{module}.{name}` class is deprecated. Please use 
`{target_class_full_name!r}`.",
+        f"The `{module}.{name}` class is deprecated. Please use 
`{warning_class_name!r}`.{extra_message}.",
         DeprecationWarning,
         stacklevel=2,
     )
     new_module, new_class_name = target_class_full_name.rsplit(".", 1)
-    return getattr(importlib.import_module(new_module), new_class_name)
+    try:
+        return getattr(importlib.import_module(new_module), new_class_name)
+    except ImportError as e:
+        raise ImportError(
+            f"Could not import `{new_module}.{new_class_name}`"
+            f" while trying to import `{module}.{name}.`{extra_message}."
+        ) from e
+
 
+def add_deprecated_classes(
+    module_imports: dict[str, dict[str, str]],
+    package: str,
+    override_deprecated_classes: dict[str, dict[str, str]] | None = None,
+    extra_message: str | None = None,
+):
+    """
+    Add deprecated class PEP-563 imports and warnings modules to the package.
 
-def add_deprecated_classes(module_imports: dict[str, dict[str, str]], package: 
str):
+    :param module_imports: imports to use
+    :param package: package name
+    :param override_deprecated_classes: override target classes with 
deprecated ones. If module +
+       target class is found in the dictionary, it will be displayed in the 
warning message.
+    :param extra_message: extra message to display in the warning or import 
error message
+    """
     for module_name, imports in module_imports.items():
         full_module_name = f"{package}.{module_name}"
         module_type = ModuleType(full_module_name)
         # Mypy is not able to derive the right function signature 
https://github.com/python/mypy/issues/2427
         module_type.__getattr__ = functools.partial(  # type: 
ignore[assignment]
-            getattr_with_deprecation, imports, full_module_name
+            getattr_with_deprecation,
+            imports,
+            full_module_name,
+            override_deprecated_classes[module_name]
+            if override_deprecated_classes and module_name in 
override_deprecated_classes
+            else dict(),

Review Comment:
   Might be a little cleaner to do this out of the partial call.



##########
airflow/models/taskinstance.py:
##########
@@ -2290,32 +2277,51 @@ def render_templates(self, context: Context | None = 
None) -> Operator:
         return original_task
 
     def render_k8s_pod_yaml(self) -> dict | None:
-        """Render k8s pod yaml."""
-        from kubernetes.client.api_client import ApiClient
-
-        from airflow.kubernetes.kube_config import KubeConfig
-        from airflow.kubernetes.kubernetes_helper_functions import 
create_pod_id  # Circular import
-        from airflow.kubernetes.pod_generator import PodGenerator
+        """Render the k8s pod yaml."""
+        try:
+            from airflow.providers.cncf.kubernetes.template_rendering import (
+                render_k8s_pod_yaml as render_k8s_pod_yaml_from_provider,
+            )
+        except ImportError:
+            raise RuntimeError(
+                "You need to have 'cncf.kubernetes' provider installed to use 
this feature. "

Review Comment:
   ```suggestion
                   "You need to have the 'cncf.kubernetes' provider installed 
to use this feature. "
   ```



##########
airflow/kubernetes/pre_7_4_0_compatibility/__init__.py:
##########
@@ -15,24 +14,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-This module is deprecated.
-Please use :mod:`kubernetes.client.models` for `V1ResourceRequirements` and 
`Port`.
-"""
 from __future__ import annotations
 
+# All the classes in this module should only be kept for 
backwards-compatibility reasons.
+# old cncf.kubernetes providers will use those in their frozen version for 
pre-7.4.0 release
 import warnings
 
-from airflow.exceptions import RemovedInAirflow3Warning
-
-# flake8: noqa
-
-with warnings.catch_warnings():
-    warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-    from airflow.providers.cncf.kubernetes.backcompat.pod import Port, 
Resources
-
 warnings.warn(
-    "This module is deprecated. Please use `kubernetes.client.models` for 
`V1ResourceRequirements` and `Port`.",
-    RemovedInAirflow3Warning,
+    "This module is deprecated. The `cncf.kubernetes` provider before version 
7.4.0 use this module - "
+    "you should migrate to version of `cncf.kubernetes` to get rid of this 
warning. If you "

Review Comment:
   ```suggestion
       "This module is deprecated. The `cncf.kubernetes` provider before 
version 7.4.0 uses this module - "
       "you should migrate to a newer version of `cncf.kubernetes` to get rid 
of this warning. If you "
   ```



##########
airflow/config_templates/__init__.py:
##########
@@ -25,4 +25,6 @@
     },
 }
 
-add_deprecated_classes(__deprecated_classes, __name__)
+add_deprecated_classes(
+    __deprecated_classes, __name__, {}, " The `celery` provider must be >= 
3.3.0 for that."

Review Comment:
   Feels a little weird requiring the caller to know it needs to prepend a 
space. Can we add that in the function instead?



##########
airflow/providers/cncf/kubernetes/provider.yaml:
##########
@@ -124,3 +125,212 @@ connection-types:
 task-decorators:
   - class-name: 
airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task
     name: kubernetes
+
+config:
+  local_kubernetes_executor:
+    description: |
+      This section only applies if you are using the 
``LocalKubernetesExecutor`` in
+      ``[core]`` section above
+    options:
+      kubernetes_queue:
+        description: |
+          Define when to send a task to ``KubernetesExecutor`` when using 
``LocalKubernetesExecutor``.
+          When the queue of a task is the value of ``kubernetes_queue`` 
(default ``kubernetes``),
+          the task is executed via ``KubernetesExecutor``,
+          otherwise via ``LocalExecutor``
+        version_added: ~
+        type: string
+        example: ~
+        default: "kubernetes"
+  kubernetes_executor:
+    description: ~
+    options:
+      api_client_retry_configuration:
+        description: |
+          Kwargs to override the default urllib3 Retry used in the kubernetes 
API client
+        version_added: ~
+        type: string
+        example: '{ "total": 3, "backoff_factor": 0.5 }'
+        default: ""
+      logs_task_metadata:
+        description: |
+          Flag to control the information added to kubernetes executor logs 
for better traceability
+        version_added: 7.4.0

Review Comment:
   This feels a little weird to me. I know this was just added in core, but all 
of these were just added into the provider. `api_client_retry_configuration` 
above is a good example. That was just added in 2.6.0 itself.
   
   I wonder if a "fresh clean slate" is better?



##########
docs/apache-airflow/core-concepts/executor/kubernetes.rst:
##########
@@ -21,6 +21,14 @@
 Kubernetes Executor
 ===================
 
+.. note::
+
+    As of Airflow 2.7.0, you need to install ``kubernetes`` provider package 
to use

Review Comment:
   ```suggestion
       As of Airflow 2.7.0, you need to install the ``kubernetes`` provider 
package to use
   ```



##########
docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst:
##########
@@ -21,6 +21,14 @@
 CeleryKubernetes Executor
 =========================
 
+.. note::
+
+    As of Airflow 2.7.0, you need to install both ``celery`` and 
``kubernetes`` provider package to use

Review Comment:
   ```suggestion
       As of Airflow 2.7.0, you need to install both the ``celery`` and 
``kubernetes`` provider packages to use
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to