dstandish commented on code in PR #28523:
URL: https://github.com/apache/airflow/pull/28523#discussion_r1054846400


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -731,3 +736,137 @@ def __exit__(self, exctype, excinst, exctb):
             return True
         else:
             return True
+
+
+class PodNotFoundException(AirflowException):
+    """Expected pod does not exist in kube-api."""
+
+
+class KubernetesPodAsyncOperator(KubernetesPodOperator):
+    """
+    Async (deferring) version of KubernetesPodOperator
+
+    .. warning::
+        By default, logs will not be available in the Airflow Webserver until 
the task completes. However,
+        you can configure ``KubernetesPodAsyncOperator`` to periodically 
resume and fetch logs.  This behavior
+        is controlled by param ``logging_interval``.
+
+    :param poll_interval: interval in seconds to sleep between checking pod 
status
+    :param logging_interval: max time in seconds that task should be in 
deferred state before
+        resuming to fetch latest logs. If ``None``, then the task will remain 
in deferred state until pod
+        is done, and no logs will be visible until that time.
+    """
+
+    def __init__(self, *, poll_interval: int = 5, logging_interval: int | None 
= None, **kwargs: Any):
+        self.poll_interval = poll_interval
+        self.logging_interval = logging_interval
+        super().__init__(**kwargs)
+
+    @staticmethod
+    def raise_for_trigger_status(event: dict[str, Any]) -> None:
+        """Raise exception if pod is not in expected state."""
+        if event["status"] == "error":
+            error_type = event["error_type"]
+            description = event["description"]
+            if error_type == "PodLaunchTimeoutException":
+                raise PodLaunchTimeoutException(description)
+            else:
+                raise AirflowException(description)
+
+    def defer(self, last_log_time: DateTime | None = None, **kwargs: Any) -> 
None:
+        """Defers to ``WaitContainerTrigger`` optionally with last log time."""
+        if kwargs:
+            raise ValueError(
+                f"Received keyword arguments {list(kwargs.keys())} but "
+                f"they are not used in this implementation of `defer`."
+            )
+        super().defer(
+            trigger=WaitContainerTrigger(
+                kubernetes_conn_id=self.kubernetes_conn_id,
+                hook_params={
+                    "cluster_context": self.cluster_context,
+                    "config_file": self.config_file,
+                    "in_cluster": self.in_cluster,
+                },
+                pod_name=self.pod.metadata.name,
+                container_name=self.BASE_CONTAINER_NAME,
+                pod_namespace=self.pod.metadata.namespace,
+                pending_phase_timeout=self.startup_timeout_seconds,
+                poll_interval=self.poll_interval,
+                logging_interval=self.logging_interval,
+                last_log_time=last_log_time,

Review Comment:
   but i guess what i'm saying is if you add it then we will have to deprecate 
it. and i think it's better to just not add it in the first place.  users early 
adopt the provider without upgrading airflow version won't get the ping 
ponging, true, but it's a pretty hacky thing to do anyway no? (i can say this 
cus i implemented it ;) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to