hussein-awala commented on code in PR #43853:
URL: https://github.com/apache/airflow/pull/43853#discussion_r1835806364


##########
providers/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -655,6 +663,31 @@ def execute_sync(self, context: Context):
         if self.do_xcom_push:
             return result
 
+    @tenacity.retry(
+        wait=tenacity.wait_exponential(max=15),
+        retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
+        reraise=True,
+    )
+    def await_init_containers_completion(self, pod: k8s.V1Pod):
+        try:
+            if self.init_container_logs:
+                self.pod_manager.fetch_requested_init_container_logs(
+                    pod=pod,
+                    init_containers=self.init_container_logs,
+                    follow_logs=True,
+                )
+        except kubernetes.client.exceptions.ApiException as exc:
+            if exc.status and str(exc.status) == "401":
+                self.log.warning(
+                    "Failed to check container status due to permission error. 
Refreshing credentials and retrying."
+                )
+                self._refresh_cached_properties()
+                self.pod_manager.read_pod(
+                    pod=pod
+                )  # attempt using refreshed credentials, raises if still 
invalid
+                raise PodCredentialsExpiredFailure("Kubernetes credentials 
expired, retrying after refresh.")
+            raise exc

Review Comment:
   A big part of `await_pod_completion` is duplicated here, would it be 
possible to move the duplicated part to a generalized method? (you can pass 
flags for the un-common checks)
   
   This method duplicates a big part of `await_pod_completion`, would it be 
possible to move the duplicated part to a generalized method? (you can pass 
flags for non-common checks)



##########
providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -118,7 +119,13 @@ def get_xcom_sidecar_container_resources(self) -> str | 
None:
 
 def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus 
| None:
     """Retrieve container status."""
-    container_statuses = pod.status.container_statuses if pod and pod.status 
else None
+    if pod and pod.status:
+        container_statuses = itertools.chain(
+            pod.status.container_statuses, pod.status.init_container_statuses
+        )
+    else:
+        container_statuses = None
+

Review Comment:
   Why do you need this change?



##########
providers/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -620,6 +625,9 @@ def execute_sync(self, context: Context):
                 self.callbacks.on_pod_creation(
                     pod=self.remote_pod, client=self.client, 
mode=ExecutionMode.SYNC
                 )
+
+            self.await_init_containers_completion(pod=self.pod)
+

Review Comment:
   Before we start reading the logs from the main containers, we wait for the 
pod to start and we need to do the same before reading the logs from the init 
containers, otherwise, the reading will fail if the pod is not scheduled yet.
   
   If I'm not mistaken, when the init containers start, the pod status will be 
`Pending`, but the pod should have a condition `Initialized` set to false (any 
state after that should be ok, for ex `Initialized`, `Running`, ... )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to