dstandish commented on code in PR #28523:
URL: https://github.com/apache/airflow/pull/28523#discussion_r1054683380


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -731,3 +736,137 @@ def __exit__(self, exctype, excinst, exctb):
             return True
         else:
             return True
+
+
+class PodNotFoundException(AirflowException):
+    """Expected pod does not exist in kube-api."""
+
+
+class KubernetesPodAsyncOperator(KubernetesPodOperator):
+    """
+    Async (deferring) version of KubernetesPodOperator
+
+    .. warning::
+        By default, logs will not be available in the Airflow Webserver until 
the task completes. However,
+        you can configure ``KubernetesPodAsyncOperator`` to periodically 
resume and fetch logs.  This behavior
+        is controlled by param ``logging_interval``.
+
+    :param poll_interval: interval in seconds to sleep between checking pod 
status
+    :param logging_interval: max time in seconds that task should be in 
deferred state before
+        resuming to fetch latest logs. If ``None``, then the task will remain 
in deferred state until pod
+        is done, and no logs will be visible until that time.
+    """
+
+    def __init__(self, *, poll_interval: int = 5, logging_interval: int | None 
= None, **kwargs: Any):
+        self.poll_interval = poll_interval
+        self.logging_interval = logging_interval
+        super().__init__(**kwargs)
+
+    @staticmethod
+    def raise_for_trigger_status(event: dict[str, Any]) -> None:
+        """Raise exception if pod is not in expected state."""
+        if event["status"] == "error":
+            error_type = event["error_type"]
+            description = event["description"]
+            if error_type == "PodLaunchTimeoutException":
+                raise PodLaunchTimeoutException(description)
+            else:
+                raise AirflowException(description)
+
+    def defer(self, last_log_time: DateTime | None = None, **kwargs: Any) -> 
None:
+        """Defers to ``WaitContainerTrigger`` optionally with last log time."""
+        if kwargs:
+            raise ValueError(
+                f"Received keyword arguments {list(kwargs.keys())} but "
+                f"they are not used in this implementation of `defer`."
+            )
+        super().defer(
+            trigger=WaitContainerTrigger(
+                kubernetes_conn_id=self.kubernetes_conn_id,
+                hook_params={
+                    "cluster_context": self.cluster_context,
+                    "config_file": self.config_file,
+                    "in_cluster": self.in_cluster,
+                },
+                pod_name=self.pod.metadata.name,
+                container_name=self.BASE_CONTAINER_NAME,
+                pod_namespace=self.pod.metadata.namespace,
+                pending_phase_timeout=self.startup_timeout_seconds,
+                poll_interval=self.poll_interval,
+                logging_interval=self.logging_interval,
+                last_log_time=last_log_time,

Review Comment:
   very soon, we will have triggerer logs in web UI in airflow.  so it might be 
better to remove the support for ping ponging i.e. logging interval / last log 
time



##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -406,3 +408,102 @@ def _get_bool(val) -> bool | None:
         elif val.strip().lower() == "false":
             return False
     return None
+
+
+class KubernetesAsyncHook(KubernetesHook):
+    """
+    Creates Async Kubernetes API connection.
+
+    - use in cluster configuration by using extra field ``in_cluster`` in 
connection
+    - use custom config by providing path to the file using extra field 
``kube_config_path`` in connection
+    - use custom configuration by providing content of kubeconfig file via
+        extra field ``kube_config`` in connection
+    - use default config by providing no extras
+
+    This hook check for configuration option in the above order. Once an 
option is present it will
+    use this configuration.
+
+    .. seealso::
+        For more information about Kubernetes connection:
+        :doc:`/connections/kubernetes`
+
+    :param conn_id: The :ref:`kubernetes connection 
<howto/connection:kubernetes>`
+        to Kubernetes cluster.
+    :param client_configuration: Optional dictionary of client configuration 
params.
+        Passed on to kubernetes client.
+    :param cluster_context: Optionally specify a context to use (e.g. if you 
have multiple
+        in your kubeconfig.
+    :param config_file: Path to kubeconfig file.
+    :param in_cluster: Set to ``True`` if running from within a kubernetes 
cluster.
+    :param disable_verify_ssl: Set to ``True`` if SSL verification should be 
disabled.
+    :param disable_tcp_keepalive: Set to ``True`` if you want to disable 
keepalive logic.
+    """
+
+    async def _load_config(self) -> client.ApiClient:
+        """
+        Load config to interact with Kubernetes
+
+        cluster_context: Optional[str] = None,
+        config_file: Optional[str] = None,
+        in_cluster: Optional[bool] = None,
+
+        """
+        if self.conn_id:
+            connection = await sync_to_async(self.get_connection)(self.conn_id)
+            extras = connection.extra_dejson
+        else:
+            extras = {}
+        in_cluster = self._coalesce_param(self.in_cluster, 
extras.get("extra__kubernetes__in_cluster"))
+        cluster_context = self._coalesce_param(
+            self.cluster_context, 
extras.get("extra__kubernetes__cluster_context")

Review Comment:
   These prefixes `extra__kubernetes__` are deprecated.  There is a helper 
method `_get_field` you can use for backcompat..



##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -731,3 +736,137 @@ def __exit__(self, exctype, excinst, exctb):
             return True
         else:
             return True
+
+
+class PodNotFoundException(AirflowException):
+    """Expected pod does not exist in kube-api."""
+
+
+class KubernetesPodAsyncOperator(KubernetesPodOperator):
+    """
+    Async (deferring) version of KubernetesPodOperator
+
+    .. warning::
+        By default, logs will not be available in the Airflow Webserver until 
the task completes. However,
+        you can configure ``KubernetesPodAsyncOperator`` to periodically 
resume and fetch logs.  This behavior
+        is controlled by param ``logging_interval``.
+
+    :param poll_interval: interval in seconds to sleep between checking pod 
status
+    :param logging_interval: max time in seconds that task should be in 
deferred state before
+        resuming to fetch latest logs. If ``None``, then the task will remain 
in deferred state until pod
+        is done, and no logs will be visible until that time.
+    """
+
+    def __init__(self, *, poll_interval: int = 5, logging_interval: int | None 
= None, **kwargs: Any):
+        self.poll_interval = poll_interval
+        self.logging_interval = logging_interval
+        super().__init__(**kwargs)
+
+    @staticmethod
+    def raise_for_trigger_status(event: dict[str, Any]) -> None:
+        """Raise exception if pod is not in expected state."""
+        if event["status"] == "error":
+            error_type = event["error_type"]
+            description = event["description"]
+            if error_type == "PodLaunchTimeoutException":
+                raise PodLaunchTimeoutException(description)
+            else:
+                raise AirflowException(description)
+
+    def defer(self, last_log_time: DateTime | None = None, **kwargs: Any) -> 
None:
+        """Defers to ``WaitContainerTrigger`` optionally with last log time."""
+        if kwargs:
+            raise ValueError(
+                f"Received keyword arguments {list(kwargs.keys())} but "
+                f"they are not used in this implementation of `defer`."
+            )
+        super().defer(
+            trigger=WaitContainerTrigger(
+                kubernetes_conn_id=self.kubernetes_conn_id,
+                hook_params={
+                    "cluster_context": self.cluster_context,
+                    "config_file": self.config_file,
+                    "in_cluster": self.in_cluster,
+                },
+                pod_name=self.pod.metadata.name,
+                container_name=self.BASE_CONTAINER_NAME,
+                pod_namespace=self.pod.metadata.namespace,
+                pending_phase_timeout=self.startup_timeout_seconds,
+                poll_interval=self.poll_interval,
+                logging_interval=self.logging_interval,
+                last_log_time=last_log_time,
+            ),
+            method_name=self.trigger_reentry.__name__,
+        )
+
+    def execute(self, context: Context) -> None:
+        self.pod_request_obj = self.build_pod_request_obj(context)
+        self.pod: k8s.V1Pod = self.get_or_create_pod(self.pod_request_obj, 
context)
+        self.defer()
+
+    def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:

Review Comment:
   since this is new operator, we don't  need any deprecated method



##########
airflow/providers/cncf/kubernetes/triggers/wait_container.py:
##########
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   So the names are driven by what the things do.  This is a trigger that waits 
until container done.  It is used by KPO now, but it doesn't mean that 
something else couldn't use it.
   
   then, think about when you are reading KPO code
   if you see "now let's defer to wait container trigger" you know what it is 
doing
   if you see "now let's defer to kubernetes pod operator trigger", then you 
don't know quite as well what it's doing
   



##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -406,3 +408,102 @@ def _get_bool(val) -> bool | None:
         elif val.strip().lower() == "false":
             return False
     return None
+
+
+class KubernetesAsyncHook(KubernetesHook):
+    """
+    Creates Async Kubernetes API connection.
+
+    - use in cluster configuration by using extra field ``in_cluster`` in 
connection
+    - use custom config by providing path to the file using extra field 
``kube_config_path`` in connection
+    - use custom configuration by providing content of kubeconfig file via
+        extra field ``kube_config`` in connection
+    - use default config by providing no extras
+
+    This hook check for configuration option in the above order. Once an 
option is present it will
+    use this configuration.
+
+    .. seealso::
+        For more information about Kubernetes connection:
+        :doc:`/connections/kubernetes`
+
+    :param conn_id: The :ref:`kubernetes connection 
<howto/connection:kubernetes>`
+        to Kubernetes cluster.
+    :param client_configuration: Optional dictionary of client configuration 
params.
+        Passed on to kubernetes client.
+    :param cluster_context: Optionally specify a context to use (e.g. if you 
have multiple
+        in your kubeconfig.
+    :param config_file: Path to kubeconfig file.
+    :param in_cluster: Set to ``True`` if running from within a kubernetes 
cluster.
+    :param disable_verify_ssl: Set to ``True`` if SSL verification should be 
disabled.
+    :param disable_tcp_keepalive: Set to ``True`` if you want to disable 
keepalive logic.
+    """
+
+    async def _load_config(self) -> client.ApiClient:
+        """
+        Load config to interact with Kubernetes
+
+        cluster_context: Optional[str] = None,
+        config_file: Optional[str] = None,
+        in_cluster: Optional[bool] = None,
+
+        """
+        if self.conn_id:
+            connection = await sync_to_async(self.get_connection)(self.conn_id)
+            extras = connection.extra_dejson
+        else:
+            extras = {}
+        in_cluster = self._coalesce_param(self.in_cluster, 
extras.get("extra__kubernetes__in_cluster"))
+        cluster_context = self._coalesce_param(
+            self.cluster_context, 
extras.get("extra__kubernetes__cluster_context")
+        )
+        kubeconfig_path = self._coalesce_param(
+            self.config_file, extras.get("extra__kubernetes__kube_config_path")
+        )
+        kubeconfig = extras.get("extra__kubernetes__kube_config") or None
+        num_selected_configuration = len([o for o in [in_cluster, kubeconfig, 
kubeconfig_path] if o])
+
+        if num_selected_configuration > 1:
+            raise AirflowException(
+                "Invalid connection configuration. Options kube_config_path, "
+                "kube_config, in_cluster are mutually exclusive. "
+                "You can only use one option at a time."
+            )
+        if in_cluster:
+            self.log.debug("loading kube_config from: in_cluster 
configuration")
+            config.load_incluster_config()
+            return client.ApiClient()
+
+        if kubeconfig_path is not None:
+            self.log.debug("loading kube_config from: %s", kubeconfig_path)
+            await config.load_kube_config(
+                config_file=kubeconfig_path,
+                client_configuration=self.client_configuration,
+                context=cluster_context,
+            )

Review Comment:
   you're referring to `async def _load_config(self)` right?
   
   well it does do file operations which are technically blocking (even though 
they might be fast)
   
   so you might as well handle them asyncly ... just cus... why not?  
   
   but now that you mention it one thing that is a bit curious is ... how are 
we able to await `await config.load_kube_config` becaus it look like that is 
not an async method... i must be missing something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to