amoghrajesh commented on code in PR #44357:
URL: https://github.com/apache/airflow/pull/44357#discussion_r1885696607


##########
providers/src/airflow/providers/cncf/kubernetes/callbacks.py:
##########
@@ -50,7 +54,27 @@ def on_sync_client_creation(*, client: k8s.CoreV1Api, 
**kwargs) -> None:
         pass
 
     @staticmethod
-    def on_pod_creation(*, pod: k8s.V1Pod, client: client_type, mode: str, 
**kwargs) -> None:
+    def on_manifest_finalization(

Review Comment:
   The name here sounds a little difficult to understand. Can we do better here?
   I have a proposal, how about `on_pod_manifest_created`?



##########
providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -446,8 +446,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> 
tuple[DateTime | None
                                 progress_callback_lines.append(line)
                             else:  # previous log line is complete
                                 for line in progress_callback_lines:
-                                    if self._callbacks:
-                                        self._callbacks.progress_callback(
+                                    for callback in self._callbacks:
+                                        callback.progress_callback(
                                             line=line, client=self._client, 
mode=ExecutionMode.SYNC

Review Comment:
   Seems ok when callbacks are running in SYNC mode. What about async?
   Would probably require some more thinking



##########
providers/src/airflow/providers/cncf/kubernetes/callbacks.py:
##########
@@ -83,7 +123,34 @@ def on_pod_completion(*, pod: k8s.V1Pod, client: 
client_type, mode: str, **kwarg
         pass
 
     @staticmethod
-    def on_pod_cleanup(*, pod: k8s.V1Pod, client: client_type, mode: str, 
**kwargs):
+    def on_pod_wrapup(
+        *,
+        pod: k8s.V1Pod,
+        client: client_type,
+        mode: str,
+        operator: KubernetesPodOperator,
+        context: Context,
+        **kwargs,
+    ) -> None:
+        """
+        Invoke this callback after all pod completion callbacks but before the 
pod is deleted.
+
+        :param pod: the completed pod.
+        :param client: the Kubernetes client that can be used in the callback.
+        :param mode: the current execution mode, it's one of (`sync`, `async`).
+        """
+        pass

Review Comment:
   How can we send the completed `pod` here. That would require some tracking 
and filtering to send one. Why can this callback's role be achieved by 
`on_pod_completion`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to